diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/Murmur2MessageExtractor.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/Murmur2MessageExtractor.scala new file mode 100644 index 0000000000..166a620218 --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/Murmur2MessageExtractor.scala @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed + +import akka.cluster.sharding.typed.internal.Murmur2 + +abstract class Murmur2NoEnvelopeMessageExtractor[M](val numberOfShards: Int) extends ShardingMessageExtractor[M, M] { + override def shardId(entityId: String): String = Murmur2.shardId(entityId, numberOfShards) + override def unwrapMessage(message: M): M = message +} + +/** + * The murmur2 message extractor uses the same algorithm as the default kafka partitoiner + * allowing kafka partitions to be mapped to shards. + * This can be used with the [[akka.cluster.sharding.external.ExternalShardAllocationStrategy]] to have messages + * processed locally. + * + * Extend [[Murmur2NoEnvelopeMessageExtractor]] to not use a message envelope extractor. + */ +final class Murmur2MessageExtractor[M](val numberOfShards: Int) + extends ShardingMessageExtractor[ShardingEnvelope[M], M] { + override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId + override def shardId(entityId: String): String = Murmur2.shardId(entityId, numberOfShards) + override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/Murmur2.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/Murmur2.scala new file mode 100644 index 0000000000..3fa79c97cb --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/Murmur2.scala @@ -0,0 +1,77 @@ +/* + * toPositive and murmur based on Java version in https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/Utils.java + * to match up with Kafka partitioning. Licensed under: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.cluster.sharding.typed.internal + +import java.nio.charset.StandardCharsets + +import akka.annotation.InternalApi + +/** + * INTERNAL API + */ +@InternalApi +private[sharding] object Murmur2 { + def toPositive(number: Int): Int = number & 0x7fffffff + def murmur2(data: Array[Byte]) = { + val length = data.length + val seed = 0x9747b28c + // 'm' and 'r' are mixing constants generated offline. + // They're not really 'magic', they just happen to work well. + val m = 0x5bd1e995 + val r = 24 + // Initialize the hash to a random value + var h = seed ^ length + val length4 = length / 4 + for (i <- 0 until length4) { + val i4 = i * 4 + var k = (data(i4 + 0) & 0xff) + ((data(i4 + 1) & 0xff) << 8) + ((data(i4 + 2) & 0xff) << 16) + ((data(i4 + 3) & 0xff) << 24) + k *= m + k ^= k >>> r + k *= m + h *= m + h ^= k + } + // Handle the last few bytes of the input array + length % 4 match { + case 3 => + h ^= (data((length & ~3) + 2) & 0xff) << 16 + h ^= (data((length & ~3) + 1) & 0xff) << 8 + h ^= data(length & ~3) & 0xff + h *= m + case 2 => + h ^= (data((length & ~3) + 1) & 0xff) << 8 + h ^= data(length & ~3) & 0xff + h *= m + case 1 => + h ^= data(length & ~3) & 0xff + h *= m + } + + h ^= h >>> 13 + h *= m + h ^= h >>> 15 + h + } + + // To match https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L59 + def shardId(entityId: String, nrShards: Int): String = + (toPositive(murmur2(entityId.getBytes(StandardCharsets.UTF_8))) % nrShards).toString +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index 58d0cc16ce..a40014dc34 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -467,7 +467,7 @@ object EntityTypeKey { * * Please note that an implicit [[akka.util.Timeout]] must be available to use this pattern. * - * Note: it is preferrable to use the non-symbolic ask method as it easier allows for wildcards for + * Note: it is preferable to use the non-symbolic ask method as it easier allows for wildcards for * the `replyTo: ActorRef`. * * @tparam Res The response protocol, what the other actor sends back diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java new file mode 100644 index 0000000000..9c1f346f5b --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.akka.cluster.sharding.typed; + +import akka.Done; +import akka.actor.Address; +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.cluster.sharding.external.ExternalShardAllocation; +import akka.cluster.sharding.external.javadsl.ExternalShardAllocationClient; +import akka.cluster.sharding.typed.ShardingEnvelope; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.Entity; +import akka.cluster.sharding.typed.javadsl.EntityTypeKey; + +import java.util.concurrent.CompletionStage; + +import static jdocs.akka.cluster.sharding.typed.ShardingCompileOnlyTest.Counter; + +public class ExternalShardAllocationCompileOnlyTest { + + void example() { + ActorSystem system = null; + + ClusterSharding sharding = ClusterSharding.get(system); + + // #entity + EntityTypeKey typeKey = EntityTypeKey.create(Counter.Command.class, "Counter"); + + ActorRef> shardRegion = + sharding.init(Entity.of(typeKey, ctx -> Counter.create(ctx.getEntityId()))); + // #entity + + // #client + ExternalShardAllocationClient client = + ExternalShardAllocation.get(system).getClient(typeKey.name()); + CompletionStage done = + client.setShardLocation("shard-id-1", new Address("akka", "system", "127.0.0.1", 2552)); + // #client + + } +} diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlySpec.scala new file mode 100644 index 0000000000..81b3edca5e --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlySpec.scala @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.akka.cluster.sharding.typed + +import akka.Done +import akka.actor.Address +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.cluster.sharding.external.ExternalShardAllocation +import akka.cluster.sharding.external.ExternalShardAllocationStrategy +import akka.cluster.sharding.external.scaladsl.ExternalShardAllocationClient +import akka.cluster.sharding.typed.ShardingEnvelope +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.Entity +import docs.akka.cluster.sharding.typed.ShardingCompileOnlySpec.Basics.Counter + +import scala.concurrent.Future + +class ExternalShardAllocationCompileOnlySpec { + val system: ActorSystem[_] = ??? + + val sharding = ClusterSharding(system) + + // #entity + val TypeKey = EntityTypeKey[Counter.Command]("Counter") + + val entity = Entity(TypeKey)(createBehavior = entityContext => Counter(entityContext.entityId)) + .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name)) + // #entity + + val shardRegion: ActorRef[ShardingEnvelope[Counter.Command]] = + sharding.init(entity) + + // #client + val client: ExternalShardAllocationClient = ExternalShardAllocation(system).clientFor(TypeKey.name) + val done: Future[Done] = client.updateShardLocation("shard-id-1", Address("akka", "system", "127.0.0.1", 2552)) + // #client + +} diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index bca2ce41be..0c8bd53cb2 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -110,6 +110,11 @@ akka.cluster.sharding { max-simultaneous-rebalance = 3 } + external-shard-allocation-strategy { + # How long to wait for the client to persist an allocation to ddata or get a all shard locations + client-timeout = 5s + } + # Timeout of waiting the initial distributed state for the shard coordinator (an initial state will be queried again if the timeout happened) # and for a shard to get its state when remembered entities is enabled # The read from ddata is a ReadMajority, for small clusters (< majority-min-cap) every node needs to respond diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 9f37225d14..2f369fe40c 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -104,6 +104,22 @@ object ShardCoordinator { rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] } + /** + * Shard allocation strategy where start is called by the shard coordinator before any calls to + * rebalance or allocate shard. This can be used if there is any expensive initialization to be done + * that you do not want to to in the constructor as it will happen on every node rather than just + * the node that hosts the ShardCoordinator + */ + trait StartableAllocationStrategy extends ShardAllocationStrategy { + + /** + * Called before any calls to allocate/rebalance. + * Do not block. If asynchronous actions are required they can be started here and + * delay the Futures returned by allocate/rebalance. + */ + def start(): Unit + } + /** * Java API: Java implementations of custom shard allocation and rebalancing logic used by the [[ShardCoordinator]] * should extend this abstract class and implement the two methods. @@ -534,6 +550,14 @@ abstract class ShardCoordinator( protected def typeName: String + override def preStart(): Unit = { + allocationStrategy match { + case strategy: StartableAllocationStrategy => + strategy.start() + case _ => + } + } + override def postStop(): Unit = { super.postStop() rebalanceTask.cancel() @@ -600,7 +624,9 @@ abstract class ShardCoordinator( AllocateShardResult(shard, Some(region), getShardHomeSender) } .recover { - case _ => AllocateShardResult(shard, None, getShardHomeSender) + case t => + log.error(t, "Shard [{}] allocation failed.", shard) + AllocateShardResult(shard, None, getShardHomeSender) } .pipeTo(self) } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/ClientTimeoutException.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/ClientTimeoutException.scala new file mode 100644 index 0000000000..a94e7e44a4 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/ClientTimeoutException.scala @@ -0,0 +1,7 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.external + +final class ClientTimeoutException(reason: String) extends RuntimeException(reason) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/ExternalShardAllocation.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/ExternalShardAllocation.scala new file mode 100644 index 0000000000..b4caafd471 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/ExternalShardAllocation.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.external + +import java.util.concurrent.ConcurrentHashMap + +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.cluster.sharding.external.internal.ExternalShardAllocationClientImpl +import java.util.function.{ Function => JFunction } + +import akka.actor.ClassicActorSystemProvider +import akka.annotation.ApiMayChange + +/** + * API May Change + */ +@ApiMayChange +final class ExternalShardAllocation(system: ExtendedActorSystem) extends Extension { + + private val clients = new ConcurrentHashMap[String, ExternalShardAllocationClientImpl] + + private val factory = new JFunction[String, ExternalShardAllocationClientImpl] { + override def apply(typeName: String): ExternalShardAllocationClientImpl = + new ExternalShardAllocationClientImpl(system, typeName) + } + + /** + * Scala API + */ + def clientFor(typeName: String): scaladsl.ExternalShardAllocationClient = client(typeName) + + /** + * Java API + */ + def getClient(typeName: String): javadsl.ExternalShardAllocationClient = client(typeName) + + private def client(typeName: String): ExternalShardAllocationClientImpl = { + clients.computeIfAbsent(typeName, factory) + } +} + +object ExternalShardAllocation extends ExtensionId[ExternalShardAllocation] with ExtensionIdProvider { + + override def createExtension(system: ExtendedActorSystem): ExternalShardAllocation = + new ExternalShardAllocation(system) + + override def lookup(): ExternalShardAllocation.type = ExternalShardAllocation + + override def get(system: ClassicActorSystemProvider): ExternalShardAllocation = super.get(system) +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/ExternalShardAllocationStrategy.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/ExternalShardAllocationStrategy.scala new file mode 100644 index 0000000000..385a6631b1 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/ExternalShardAllocationStrategy.scala @@ -0,0 +1,219 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.external + +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.ActorRefScope +import akka.actor.Address +import akka.actor.AddressFromURIString +import akka.actor.ClassicActorSystemProvider +import akka.actor.ExtendedActorSystem +import akka.actor.NoSerializationVerificationNeeded +import akka.actor.Props +import akka.actor.Stash +import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.LWWMapKey +import akka.cluster.ddata.Replicator.Changed +import akka.cluster.ddata.Replicator.Subscribe +import akka.cluster.sharding.ShardCoordinator +import akka.cluster.sharding.ShardRegion.ShardId +import akka.event.Logging +import akka.pattern.AskTimeoutException +import akka.util.Timeout + +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration._ + +object ExternalShardAllocationStrategy { + + type ShardRegion = ActorRef + + // local only messages + private[akka] final case class GetShardLocation(shard: ShardId) + private[akka] final case object GetShardLocations + private[akka] final case class GetShardLocationsResponse(desiredAllocations: Map[ShardId, Address]) + private[akka] final case class GetShardLocationResponse(address: Option[Address]) + + // only returned locally, serialized as a string + final case class ShardLocation(address: Address) extends NoSerializationVerificationNeeded + + private object DDataStateActor { + def props(typeName: String) = Props(new DDataStateActor(typeName)) + } + + // uses a string primitive types are optimized in ddata to not serialize every entity + // separately + private[akka] def ddataKey(typeName: String): LWWMapKey[ShardId, String] = { + LWWMapKey[ShardId, String](s"external-sharding-$typeName") + } + + private class DDataStateActor(typeName: String) extends Actor with ActorLogging with Stash { + + private val replicator = DistributedData(context.system).replicator + + private val Key = ddataKey(typeName) + + override def preStart(): Unit = { + log.debug("Starting ddata state actor for [{}]", typeName) + replicator ! Subscribe(Key, self) + } + + var currentLocations: Map[ShardId, String] = Map.empty + + override def receive: Receive = { + case c @ Changed(key: LWWMapKey[ShardId, String] @unchecked) => + val newLocations = c.get(key).entries + currentLocations ++= newLocations + log.debug("Received updated shard locations [{}] all locations are now [{}]", newLocations, currentLocations) + case GetShardLocation(shard) => + log.debug("GetShardLocation [{}]", shard) + val shardLocation = currentLocations.get(shard).map(asStr => AddressFromURIString(asStr)) + sender() ! GetShardLocationResponse(shardLocation) + case GetShardLocations => + log.debug("GetShardLocations") + sender() ! GetShardLocationsResponse(currentLocations.transform((_, asStr) => AddressFromURIString(asStr))) + } + + } +} + +class ExternalShardAllocationStrategy(systemProvider: ClassicActorSystemProvider, typeName: String)( + // local only ask + implicit val timeout: Timeout = Timeout(5.seconds)) + extends ShardCoordinator.StartableAllocationStrategy { + + private val system = systemProvider.classicSystem + + import ExternalShardAllocationStrategy._ + import akka.pattern.ask + import system.dispatcher + + private val log = Logging(system, classOf[ExternalShardAllocationStrategy]) + + private var shardState: ActorRef = _ + + private[akka] def createShardStateActor(): ActorRef = { + system + .asInstanceOf[ExtendedActorSystem] + .systemActorOf(DDataStateActor.props(typeName), s"external-allocation-state-$typeName") + } + + private val cluster = Cluster(system) + + override def start(): Unit = { + shardState = createShardStateActor() + } + + override def allocateShard( + requester: ShardRegion, + shardId: ShardId, + currentShardAllocations: Map[ShardRegion, immutable.IndexedSeq[ShardId]]): Future[ShardRegion] = { + + log.debug("allocateShard [{}] [{}] [{}]", shardId, requester, currentShardAllocations) + + // current shard allocations include all current shard regions + (shardState ? GetShardLocation(shardId)) + .mapTo[GetShardLocationResponse] + .map { + case GetShardLocationResponse(None) => + log.debug("No specific location for shard [{}]. Allocating to requester [{}]", shardId, requester) + requester + case GetShardLocationResponse(Some(address)) => + // if it is the local address, convert it so it is found in the shards + if (address == cluster.selfAddress) { + currentShardAllocations.keys.find(_.path.address.hasLocalScope) match { + case None => + log.debug("unable to find local shard in currentShardAllocation. Using requester") + requester + case Some(localShardRegion) => + log.debug("allocating to local shard") + localShardRegion + } + } else { + currentShardAllocations.keys.find(_.path.address == address) match { + case None => + log.debug( + "External shard location [{}] for shard [{}] not found in members [{}]", + address, + shardId, + currentShardAllocations.keys.mkString(",")) + requester + case Some(location) => + log.debug("Allocating shard to location [{}]", location) + location + } + } + } + .recover { + case _: AskTimeoutException => + log.warning( + "allocate timed out waiting for shard allocation state [{}]. Allocating to requester [{}]", + shardId, + requester) + requester + } + + } + + override def rebalance( + currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]], + rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = { + + log.debug("rebalance [{}] [{}]", currentShardAllocations, rebalanceInProgress) + + val currentAllocationByAddress: Map[Address, immutable.IndexedSeq[ShardId]] = currentShardAllocations.map { + case (ref: ActorRefScope, value) if ref.isLocal => + (cluster.selfAddress, value) // so it can be compared to a address with host and port + case (key, value) => (key.path.address, value) + } + + val currentlyAllocatedShards: Set[ShardId] = currentShardAllocations.foldLeft(Set.empty[ShardId]) { + case (acc, next) => acc ++ next._2.toSet + } + + log.debug("Current allocations by address: [{}]", currentAllocationByAddress) + + val shardsThatNeedRebalanced: Future[Set[ShardId]] = for { + desiredMappings <- (shardState ? GetShardLocations).mapTo[GetShardLocationsResponse] + } yield { + log.debug("desired allocations: [{}]", desiredMappings.desiredAllocations) + desiredMappings.desiredAllocations.filter { + case (shardId, expectedLocation) if currentlyAllocatedShards.contains(shardId) => + currentAllocationByAddress.get(expectedLocation) match { + case None => + log.debug( + "Shard [{}] desired location [{}] is not part of the cluster, not rebalancing", + shardId, + expectedLocation) + false // not a current allocation so don't rebalance yet + case Some(shards) => + val inCorrectLocation = shards.contains(shardId) + !inCorrectLocation + } + case (shardId, _) => + log.debug("Shard [{}] not currently allocated so not rebalancing to desired location", shardId) + false + } + }.keys.toSet + + shardsThatNeedRebalanced + .map { done => + if (done.nonEmpty) { + log.debug("Shards not currently in their desired location [{}]", done) + } + done + } + .recover { + case _: AskTimeoutException => + log.warning("rebalance timed out waiting for shard allocation state. Keeping existing allocations") + Set.empty + } + } + +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/ShardLocations.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/ShardLocations.scala new file mode 100644 index 0000000000..dca25603ae --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/ShardLocations.scala @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.external + +import akka.cluster.sharding.ShardRegion.ShardId +import akka.cluster.sharding.external.ExternalShardAllocationStrategy.ShardLocation +import akka.util.ccompat.JavaConverters._ + +final class ShardLocations(val locations: Map[ShardId, ShardLocation]) { + + /** + * Java API + */ + def getShardLocations(): java.util.Map[ShardId, ShardLocation] = locations.asJava +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/internal/ExternalShardAllocationClientImpl.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/internal/ExternalShardAllocationClientImpl.scala new file mode 100644 index 0000000000..d315a11ac8 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/internal/ExternalShardAllocationClientImpl.scala @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.external.internal + +import java.util.concurrent.CompletionStage + +import akka.Done +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.AddressFromURIString +import akka.annotation.InternalApi +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.LWWMap +import akka.cluster.ddata.Replicator.Get +import akka.cluster.ddata.Replicator.GetFailure +import akka.cluster.ddata.Replicator.GetSuccess +import akka.cluster.ddata.Replicator.ReadMajority +import akka.cluster.ddata.Replicator.Update +import akka.cluster.ddata.Replicator.UpdateSuccess +import akka.cluster.ddata.Replicator.UpdateTimeout +import akka.cluster.ddata.Replicator.WriteLocal +import akka.cluster.ddata.SelfUniqueAddress +import akka.cluster.sharding.ShardRegion.ShardId +import akka.cluster.sharding.external.ClientTimeoutException +import akka.cluster.sharding.external.ExternalShardAllocationStrategy +import akka.cluster.sharding.external.ExternalShardAllocationStrategy.ShardLocation +import akka.cluster.sharding.external.ShardLocations +import akka.event.Logging +import akka.util.Timeout +import akka.util.PrettyDuration._ +import akka.pattern.ask + +import scala.concurrent.Future +import scala.compat.java8.FutureConverters._ +import akka.util.JavaDurationConverters._ + +/** + * INTERNAL API + */ +@InternalApi +final private[external] class ExternalShardAllocationClientImpl(system: ActorSystem, typeName: String) + extends akka.cluster.sharding.external.scaladsl.ExternalShardAllocationClient + with akka.cluster.sharding.external.javadsl.ExternalShardAllocationClient { + + private val log = Logging(system, classOf[ExternalShardAllocationClientImpl]) + + private val replicator: ActorRef = DistributedData(system).replicator + private val self: SelfUniqueAddress = DistributedData(system).selfUniqueAddress + + private val timeout = + system.settings.config + .getDuration("akka.cluster.sharding.external-shard-allocation-strategy.client-timeout") + .asScala + private implicit val askTimeout = Timeout(timeout * 2) + private implicit val ec = system.dispatchers.internalDispatcher + + private val Key = ExternalShardAllocationStrategy.ddataKey(typeName) + + override def updateShardLocation(shard: ShardId, location: Address): Future[Done] = { + log.debug("updateShardLocation {} {} key {}", shard, location, Key) + (replicator ? Update(Key, LWWMap.empty[ShardId, String], WriteLocal, None) { existing => + existing.put(self, shard, location.toString) + }).flatMap { + case UpdateSuccess(_, _) => Future.successful(Done) + case UpdateTimeout => + Future.failed(new ClientTimeoutException(s"Unable to update shard location after ${timeout.duration.pretty}")) + } + } + + override def setShardLocation(shard: ShardId, location: Address): CompletionStage[Done] = + updateShardLocation(shard, location).toJava + + override def shardLocations(): Future[ShardLocations] = { + (replicator ? Get(Key, ReadMajority(timeout))) + .flatMap { + case success @ GetSuccess(`Key`, _) => + Future.successful( + success.get(Key).entries.transform((_, asStr) => ShardLocation(AddressFromURIString(asStr)))) + case GetFailure(_, _) => + Future.failed((new ClientTimeoutException(s"Unable to get shard locations after ${timeout.duration.pretty}"))) + } + .map { locations => + new ShardLocations(locations) + } + } + + override def getShardLocations(): CompletionStage[ShardLocations] = shardLocations().toJava +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/javadsl/ExternalShardAllocationClient.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/javadsl/ExternalShardAllocationClient.scala new file mode 100644 index 0000000000..fd561757c3 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/javadsl/ExternalShardAllocationClient.scala @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.external.javadsl + +import java.util.concurrent.CompletionStage + +import akka.Done +import akka.actor.Address +import akka.annotation.ApiMayChange +import akka.cluster.sharding.ShardRegion.ShardId +import akka.cluster.sharding.external.ShardLocations + +/** + * API May Change + * + * Not for user extension + */ +@ApiMayChange +trait ExternalShardAllocationClient { + + /** + * Update the given shard's location. The [[Address]] should + * match one of the nodes in the cluster. If the node has not joined + * the cluster yet it will be moved to that node after the first cluster + * sharding rebalance. + * + * @param shard The shard identifier + * @param location Location (akka node) to allocate the shard to + * @return Confirmation that the update has been propagated to a majority of cluster nodes + */ + def setShardLocation(shard: ShardId, location: Address): CompletionStage[Done] + + /** + * Get all the current shard locations that have been set via setShardLocation + */ + def getShardLocations(): CompletionStage[ShardLocations] +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/scaladsl/ExternalShardAllocationClient.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/scaladsl/ExternalShardAllocationClient.scala new file mode 100644 index 0000000000..8f096f6a73 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/scaladsl/ExternalShardAllocationClient.scala @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.external.scaladsl + +import akka.Done +import akka.actor.Address +import akka.annotation.ApiMayChange +import akka.cluster.sharding.ShardRegion.ShardId +import akka.cluster.sharding.external.ShardLocations + +import scala.concurrent.Future + +/** + * API May Change + * + * Not for user extension + */ +@ApiMayChange +trait ExternalShardAllocationClient { + + /** + * Update the given shard's location. The [[Address]] should + * match one of the nodes in the cluster. If the node has not joined + * the cluster yet it will be moved to that node after the first cluster + * sharding rebalance. + * + * @param shard The shard identifier + * @param location Location (akka node) to allocate the shard to + * @return Confirmation that the update has been propagated to a majority of cluster nodes + */ + def updateShardLocation(shard: ShardId, location: Address): Future[Done] + + /** + * Get all the current shard locations that have been set via updateShardLocation + */ + def shardLocations(): Future[ShardLocations] +} diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala new file mode 100644 index 0000000000..8115f23c1c --- /dev/null +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala @@ -0,0 +1,186 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.cluster.sharding + +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.Address +import akka.actor.PoisonPill +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.MultiNodeClusterSpec +import akka.cluster.sharding.ExternalShardAllocationSpec.GiveMeYourHome.Get +import akka.cluster.sharding.ExternalShardAllocationSpec.GiveMeYourHome.Home +import akka.cluster.sharding.external.ExternalShardAllocation +import akka.cluster.sharding.external.ExternalShardAllocationStrategy +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.serialization.jackson.CborSerializable +import akka.testkit.ImplicitSender +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.duration._ + +object ExternalShardAllocationSpecConfig extends MultiNodeConfig { + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.cluster.sharding { + distributed-data.durable.lmdb { + dir = target/ExternalShardAllocationSpec/sharding-ddata + map-size = 10 MiB + } + retry-interval = 2000ms + waiting-for-state-timeout = 2000ms + rebalance-interval = 1s + } + """).withFallback(MultiNodeClusterSpec.clusterConfig)) + + val first = role("first") + val second = role("second") + val third = role("third") + val forth = role("forth") +} + +class ExternalShardAllocationSpecMultiJvmNode1 extends ExternalShardAllocationSpec +class ExternalShardAllocationSpecMultiJvmNode2 extends ExternalShardAllocationSpec +class ExternalShardAllocationSpecMultiJvmNode3 extends ExternalShardAllocationSpec +class ExternalShardAllocationSpecMultiJvmNode4 extends ExternalShardAllocationSpec + +object ExternalShardAllocationSpec { + + object GiveMeYourHome { + case class Get(id: String) extends CborSerializable + case class Home(address: Address) extends CborSerializable + + val extractEntityId: ShardRegion.ExtractEntityId = { + case g @ Get(id) => (id, g) + } + + // shard == id to make testing easier + val extractShardId: ShardRegion.ExtractShardId = { + case Get(id) => id + } + } + + class GiveMeYourHome extends Actor with ActorLogging { + + val selfAddress = Cluster(context.system).selfAddress + + log.info("Started on {}", selfAddress) + + override def receive: Receive = { + case Get(_) => + sender() ! Home(selfAddress) + } + } +} + +abstract class ExternalShardAllocationSpec + extends MultiNodeSpec(ExternalShardAllocationSpecConfig) + with MultiNodeClusterSpec + with ImplicitSender + with ScalaFutures { + + import ExternalShardAllocationSpecConfig._ + import ExternalShardAllocationSpec._ + import ExternalShardAllocationSpec.GiveMeYourHome._ + + override implicit val patienceConfig: PatienceConfig = PatienceConfig(5.second) + + val typeName = "home" + val initiallyOnForth = "on-forth" + + "External shard allocation" must { + "form cluster" in { + awaitClusterUp(first, second, third) + enterBarrier("cluster-started") + } + + lazy val shardRegion = { + ClusterSharding(system).start( + typeName = typeName, + entityProps = Props[GiveMeYourHome], + settings = ClusterShardingSettings(system), + extractEntityId = extractEntityId, + extractShardId = extractShardId, + new ExternalShardAllocationStrategy(system, typeName), + PoisonPill) + } + + "start cluster sharding" in { + shardRegion + enterBarrier("shard-region-started") + } + + "default to allocating a shard to the local shard region" in { + runOn(first, second, third) { + shardRegion ! Get(myself.name) + val actorLocation = expectMsgType[Home](20.seconds).address + actorLocation shouldEqual Cluster(system).selfAddress + } + enterBarrier("local-message-sent") + } + + "move shard via distributed data" in { + val shardToSpecifyLocation = "cats" + runOn(first) { + ExternalShardAllocation(system) + .clientFor(typeName) + .updateShardLocation(shardToSpecifyLocation, Cluster(system).selfAddress) + .futureValue + } + enterBarrier("shard-location-updated") + + runOn(second, third) { + val probe = TestProbe() + awaitAssert({ + shardRegion.tell(Get(shardToSpecifyLocation), probe.ref) + probe.expectMsg(Home(address(first))) + }, 10.seconds) + } + enterBarrier("shard-allocated-to-specific-node") + } + + "allocate to a node that does not exist yet" in { + val onForthShardId = "on-forth" + val forthAddress = address(forth) + runOn(second) { + system.log.info("Allocating {} on {}", onForthShardId, forthAddress) + ExternalShardAllocation(system).clientFor(typeName).updateShardLocation(onForthShardId, forthAddress) + } + enterBarrier("allocated-to-new-node") + runOn(forth) { + joinWithin(first) + } + enterBarrier("forth-node-joined") + runOn(first, second, third) { + awaitAssert({ + shardRegion ! Get(initiallyOnForth) + expectMsg(Home(address(forth))) + }, 10.seconds) + } + enterBarrier("shard-allocated-to-forth") + } + + "move allocation" in { + runOn(third) { + system.log.info("Moving shard from forth to first: {}", address(first)) + ExternalShardAllocation(system).clientFor(typeName).updateShardLocation(initiallyOnForth, address(first)) + } + enterBarrier("shard-moved-from-forth-to-first") + runOn(first, second, third, forth) { + awaitAssert({ + shardRegion ! Get(initiallyOnForth) + expectMsg(Home(address(first))) + }, 10.seconds) + } + enterBarrier("finished") + } + } +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/external/ExternalShardAllocationStrategySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/external/ExternalShardAllocationStrategySpec.scala new file mode 100644 index 0000000000..b40a77e718 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/external/ExternalShardAllocationStrategySpec.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.external + +import akka.cluster.sharding.external.ExternalShardAllocationStrategy.GetShardLocation +import akka.cluster.sharding.external.ExternalShardAllocationStrategy.GetShardLocationResponse +import akka.cluster.sharding.external.ExternalShardAllocationStrategy.GetShardLocations +import akka.testkit.AkkaSpec +import akka.testkit.TestProbe +import akka.util.Timeout + +import scala.concurrent.duration._ + +class ExternalShardAllocationStrategySpec extends AkkaSpec(""" + akka.actor.provider = cluster + akka.loglevel = INFO + """) { + + val requester = TestProbe() + + "ExternalShardAllocation allocate" must { + "default to requester if query times out" in { + val (strat, _) = createStrategy() + strat.allocateShard(requester.ref, "shard-1", Map.empty).futureValue shouldEqual requester.ref + } + "default to requester if no allocation" in { + val (strat, probe) = createStrategy() + val allocation = strat.allocateShard(requester.ref, "shard-1", Map.empty) + probe.expectMsg(GetShardLocation("shard-1")) + probe.reply(GetShardLocationResponse(None)) + allocation.futureValue shouldEqual requester.ref + } + } + + "ExternalShardAllocation rebalance" must { + "default to no rebalance if query times out" in { + val (strat, probe) = createStrategy() + val rebalance = strat.rebalance(Map.empty, Set.empty) + probe.expectMsg(GetShardLocations) + rebalance.futureValue shouldEqual Set.empty + } + } + + def createStrategy(): (ExternalShardAllocationStrategy, TestProbe) = { + val probe = TestProbe() + val strategy = new ExternalShardAllocationStrategy(system, "type")(Timeout(250.millis)) { + override private[akka] def createShardStateActor() = probe.ref + } + strategy.start() + (strategy, probe) + } + +} diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 855a1e1d27..ee5376e0d4 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -1544,11 +1544,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case None => NotFound(key, req) } replyTo ! reply - } else + } else { context.actorOf( ReadAggregator .props(key, consistency, req, selfUniqueAddress, nodes, unreachable, localValue, replyTo) .withDispatcher(context.props.dispatcher)) + } } def isLocalGet(readConsistency: ReadConsistency): Boolean = diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 9355d143d3..abb26e7e89 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -162,10 +162,40 @@ The `number-of-shards` configuration value must be the same for all nodes in the configuration check when joining. Changing the value requires stopping all nodes in the cluster. The shards are allocated to the nodes in the cluster. The decision of where to allocate a shard is done -by a shard allocation strategy. The default implementation `akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy` +by a shard allocation strategy. The default implementation @apidoc[ShardCoordinator.LeastShardAllocationStrategy] allocates new shards to the `ShardRegion` (node) with least number of previously allocated shards. This strategy can be replaced by an application specific implementation. - + +### External shard allocation + +An alternative allocation strategy is the @apidoc[ExternalShardAllocationStrategy] which allows +explicit control over where shards are allocated via the @apidoc[ExternalShardAllocation] extension. +This can be used, for example, to match up Kafka Partition consumption with shard locations. + +To use it set it as the allocation strategy on your `Entity`: + +Scala +: @@snip [ExternalShardAllocationCompileOnlySpec](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlySpec.scala) { #entity } + +Java +: @@snip [ExternalShardAllocationCompileOnlyTest](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java) { #entity } + +For any shardId that has not been allocated it will be allocated to the requesting node. To make explicit allocations: + +Scala +: @@snip [ExternalShardAllocationCompileOnlySpec](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlySpec.scala) { #client } + +Java +: @@snip [ExternalShardAllocationCompileOnlyTest](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java) { #client } + +Any new or moved shard allocations will be moved on the next rebalance. + +The communication from the client to the shard allocation strategy is via @ref[Distributed Data](./distributed-data.md). +It uses a single `LWWMap` that can support 10s of thousands of shards. Later versions could use multiple keys to +support a greater number of shards. + +### Custom shard allocation + An optional custom shard allocation strategy can be passed into the optional parameter when initializing an entity type or explicitly using the `withAllocationStrategy` function. See the API documentation of @scala[`akka.cluster.sharding.ShardAllocationStrategy`]@java[`akka.cluster.sharding.AbstractShardAllocationStrategy`] for details of how to implement a custom `ShardAllocationStrategy`.