diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Ask.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Ask.scala index ee4d4d2db6..27e283c91a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Ask.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Ask.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ package akka.actor.typed package javadsl @@ -5,12 +8,26 @@ import java.util.concurrent.CompletionStage import akka.actor.Scheduler import akka.actor.typed.scaladsl.AskPattern._ -import akka.japi.function.Function +import akka.japi.function.{ Function ⇒ JFunction } import akka.util.Timeout -import scala.compat.java8.FutureConverters +import scala.compat.java8.FutureConverters._ +/** + * The ask-pattern implements the initiator side of a request–reply protocol. + * + * Note that if you are inside of an actor you should prefer [[ActorContext.ask]] + * as that provides better safety. + * + * The party that asks may be within or without an Actor, since the + * implementation will fabricate a (hidden) [[ActorRef]] that is bound to a + * `CompletableFuture`. This ActorRef will need to be injected in the + * message that is sent to the target Actor in order to function as a reply-to + * address, therefore the argument to the ask method is not the message itself + * but a function that given the reply-to address will create the message. + * + */ object AskPattern { - def ask[T, U](actor: ActorRef[T], message: Function[ActorRef[U], T], timeout: Timeout, scheduler: Scheduler): CompletionStage[U] = - FutureConverters.toJava[U](actor.?(message.apply)(timeout, scheduler)) + def ask[T, U](actor: ActorRef[T], message: JFunction[ActorRef[U], T], timeout: Timeout, scheduler: Scheduler): CompletionStage[U] = + (actor.?(message.apply)(timeout, scheduler)).toJava } diff --git a/akka-cluster-sharding-typed/src/main/java/akka/cluster/sharding/typed/internal/protobuf/ShardingMessages.java b/akka-cluster-sharding-typed/src/main/java/akka/cluster/sharding/typed/internal/protobuf/ShardingMessages.java index 3ae871d7ca..7ea8037dff 100644 --- a/akka-cluster-sharding-typed/src/main/java/akka/cluster/sharding/typed/internal/protobuf/ShardingMessages.java +++ b/akka-cluster-sharding-typed/src/main/java/akka/cluster/sharding/typed/internal/protobuf/ShardingMessages.java @@ -26,17 +26,17 @@ public final class ShardingMessages { akka.protobuf.ByteString getEntityIdBytes(); - // optional .Payload message = 2; + // required .Payload message = 2; /** - * optional .Payload message = 2; + * required .Payload message = 2; */ boolean hasMessage(); /** - * optional .Payload message = 2; + * required .Payload message = 2; */ akka.remote.ContainerFormats.Payload getMessage(); /** - * optional .Payload message = 2; + * required .Payload message = 2; */ akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder(); } @@ -192,23 +192,23 @@ public final class ShardingMessages { } } - // optional .Payload message = 2; + // required .Payload message = 2; public static final int MESSAGE_FIELD_NUMBER = 2; private akka.remote.ContainerFormats.Payload message_; /** - * optional .Payload message = 2; + * required .Payload message = 2; */ public boolean hasMessage() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * optional .Payload message = 2; + * required .Payload message = 2; */ public akka.remote.ContainerFormats.Payload getMessage() { return message_; } /** - * optional .Payload message = 2; + * required .Payload message = 2; */ public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() { return message_; @@ -227,11 +227,13 @@ public final class ShardingMessages { memoizedIsInitialized = 0; return false; } - if (hasMessage()) { - if (!getMessage().isInitialized()) { - memoizedIsInitialized = 0; - return false; - } + if (!hasMessage()) { + memoizedIsInitialized = 0; + return false; + } + if (!getMessage().isInitialized()) { + memoizedIsInitialized = 0; + return false; } memoizedIsInitialized = 1; return true; @@ -461,11 +463,13 @@ public final class ShardingMessages { return false; } - if (hasMessage()) { - if (!getMessage().isInitialized()) { - - return false; - } + if (!hasMessage()) { + + return false; + } + if (!getMessage().isInitialized()) { + + return false; } return true; } @@ -563,18 +567,18 @@ public final class ShardingMessages { return this; } - // optional .Payload message = 2; + // required .Payload message = 2; private akka.remote.ContainerFormats.Payload message_ = akka.remote.ContainerFormats.Payload.getDefaultInstance(); private akka.protobuf.SingleFieldBuilder< akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder> messageBuilder_; /** - * optional .Payload message = 2; + * required .Payload message = 2; */ public boolean hasMessage() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * optional .Payload message = 2; + * required .Payload message = 2; */ public akka.remote.ContainerFormats.Payload getMessage() { if (messageBuilder_ == null) { @@ -584,7 +588,7 @@ public final class ShardingMessages { } } /** - * optional .Payload message = 2; + * required .Payload message = 2; */ public Builder setMessage(akka.remote.ContainerFormats.Payload value) { if (messageBuilder_ == null) { @@ -600,7 +604,7 @@ public final class ShardingMessages { return this; } /** - * optional .Payload message = 2; + * required .Payload message = 2; */ public Builder setMessage( akka.remote.ContainerFormats.Payload.Builder builderForValue) { @@ -614,7 +618,7 @@ public final class ShardingMessages { return this; } /** - * optional .Payload message = 2; + * required .Payload message = 2; */ public Builder mergeMessage(akka.remote.ContainerFormats.Payload value) { if (messageBuilder_ == null) { @@ -633,7 +637,7 @@ public final class ShardingMessages { return this; } /** - * optional .Payload message = 2; + * required .Payload message = 2; */ public Builder clearMessage() { if (messageBuilder_ == null) { @@ -646,7 +650,7 @@ public final class ShardingMessages { return this; } /** - * optional .Payload message = 2; + * required .Payload message = 2; */ public akka.remote.ContainerFormats.Payload.Builder getMessageBuilder() { bitField0_ |= 0x00000002; @@ -654,7 +658,7 @@ public final class ShardingMessages { return getMessageFieldBuilder().getBuilder(); } /** - * optional .Payload message = 2; + * required .Payload message = 2; */ public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() { if (messageBuilder_ != null) { @@ -664,7 +668,7 @@ public final class ShardingMessages { } } /** - * optional .Payload message = 2; + * required .Payload message = 2; */ private akka.protobuf.SingleFieldBuilder< akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder> @@ -708,7 +712,7 @@ public final class ShardingMessages { "\n\026ShardingMessages.proto\022\033akka.cluster.s" + "harding.typed\032\026ContainerFormats.proto\"?\n" + "\020ShardingEnvelope\022\020\n\010entityId\030\001 \002(\t\022\031\n\007m" + - "essage\030\002 \001(\0132\010.PayloadB1\n-akka.cluster.s" + + "essage\030\002 \002(\0132\010.PayloadB1\n-akka.cluster.s" + "harding.typed.internal.protobufH\001" }; akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = diff --git a/akka-cluster-sharding-typed/src/main/protobuf/ShardingMessages.proto b/akka-cluster-sharding-typed/src/main/protobuf/ShardingMessages.proto index 75ae86745b..6c4cd955e4 100644 --- a/akka-cluster-sharding-typed/src/main/protobuf/ShardingMessages.proto +++ b/akka-cluster-sharding-typed/src/main/protobuf/ShardingMessages.proto @@ -9,5 +9,5 @@ import "ContainerFormats.proto"; message ShardingEnvelope { required string entityId = 1; - optional Payload message = 2; + required Payload message = 2; } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterSharding.scala deleted file mode 100644 index c418d93b01..0000000000 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterSharding.scala +++ /dev/null @@ -1,510 +0,0 @@ -/* - * Copyright (C) 2017-2018 Lightbend Inc. - */ -package akka.cluster.sharding.typed - -import scala.reflect.ClassTag - -import akka.actor.typed.ActorRef -import akka.actor.typed.ActorSystem -import akka.actor.typed.Behavior -import akka.actor.typed.Behavior.UntypedBehavior -import akka.actor.typed.Extension -import akka.actor.typed.ExtensionId -import akka.actor.typed.Props -import akka.actor.typed.internal.adapter.ActorRefAdapter -import akka.actor.typed.internal.adapter.ActorSystemAdapter -import akka.annotation.DoNotInherit -import akka.annotation.InternalApi -import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy -import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy -import akka.cluster.sharding.ShardRegion -import akka.cluster.sharding.ShardRegion.{ StartEntity ⇒ UntypedStartEntity } -import akka.cluster.typed.Cluster -import akka.event.Logging -import akka.event.LoggingAdapter - -/** - * Default envelope type that may be used with Cluster Sharding. - * - * Cluster Sharding provides a default [[HashCodeMessageExtractor]] that is able to handle - * these types of messages, by hashing the entityId into into the shardId. It is not the only, - * but a convenient way to send envelope-wrapped messages via cluster sharding. - * - * The alternative way of routing messages through sharding is to not use envelopes, - * and have the message types themselves carry identifiers. - */ -final case class ShardingEnvelope[A](entityId: String, message: A) // TODO think if should remain a case class - -/** Allows starting a specific Sharded Entity by its entity identifier */ -object StartEntity { - - /** - * Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the - * specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it. - */ - def apply[A](entityId: String): ShardingEnvelope[A] = { - // StartEntity isn't really of type A, but erased and StartEntity is only handled internally, not delivered to the entity - new ShardingEnvelope[A](entityId, UntypedStartEntity(entityId).asInstanceOf[A]) - } - - /** - * Java API - * - * Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the - * specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it. - */ - def create[A](msgClass: Class[A], entityId: String): ShardingEnvelope[A] = - apply[A](entityId) -} - -object ShardingMessageExtractor { - - /** - * Scala API: - * - * Create the default message extractor, using envelopes to identify what entity a message is for - * and the hashcode of the entityId to decide which shard an entity belongs to. - * - * This is recommended since it does not force details about sharding into the entity protocol - */ - def apply[A](maxNumberOfShards: Int, handOffStopMessage: A): ShardingMessageExtractor[ShardingEnvelope[A], A] = - new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) - - /** - * Scala API: Create a message extractor for a protocol where the entity id is available in each message. - */ - def noEnvelope[A]( - maxNumberOfShards: Int, - handOffStopMessage: A)( - extractEntityId: A ⇒ String): ShardingMessageExtractor[A, A] = - new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) { - def entityId(message: A) = extractEntityId(message) - } - -} - -/** - * Entirely customizable typed message extractor. Prefer [[HashCodeMessageExtractor]] or - * [[HashCodeNoEnvelopeMessageExtractor]]if possible. - * - * @tparam E Possibly an Envelope around the messages accepted by the entity actor, is the same as `A` if there is no - * envelope. - * @tparam A The type of message accepted by the entity actor - */ -abstract class ShardingMessageExtractor[E, A] { - - /** - * Extract the entity id from an incoming `message`. If `null` is returned - * the message will be `unhandled`, i.e. posted as `Unhandled` messages on the event stream - */ - def entityId(message: E): String - - /** - * The shard identifier for a given entity id. Only messages that passed the [[ShardingMessageExtractor#entityId]] - * function will be used as input to this function. - */ - def shardId(entityId: String): String - - /** - * Extract the message to send to the entity from an incoming `message`. - * Note that the extracted message does not have to be the same as the incoming - * message to support wrapping in message envelope that is unwrapped before - * sending to the entity actor. - */ - def unwrapMessage(message: E): A - - /** - * Message sent to an entity to tell it to stop, e.g. when rebalanced. - * The message defined here is not passed to `entityId`, `shardId` or `unwrapMessage`. - */ - def handOffStopMessage: A -} - -/** - * Default message extractor type, using envelopes to identify what entity a message is for - * and the hashcode of the entityId to decide which shard an entity belongs to. - * - * This is recommended since it does not force details about sharding into the entity protocol - * - * @tparam A The type of message accepted by the entity actor - */ -final class HashCodeMessageExtractor[A]( - val maxNumberOfShards: Int, - override val handOffStopMessage: A) - extends ShardingMessageExtractor[ShardingEnvelope[A], A] { - - override def entityId(envelope: ShardingEnvelope[A]): String = envelope.entityId - override def shardId(entityId: String): String = (math.abs(entityId.hashCode) % maxNumberOfShards).toString - override def unwrapMessage(envelope: ShardingEnvelope[A]): A = envelope.message -} - -/** - * Default message extractor type, using a property of the message to identify what entity a message is for - * and the hashcode of the entityId to decide which shard an entity belongs to. - * - * This is recommended since it does not force details about sharding into the entity protocol - * - * @tparam A The type of message accepted by the entity actor - */ -abstract class HashCodeNoEnvelopeMessageExtractor[A]( - val maxNumberOfShards: Int, - override val handOffStopMessage: A) - extends ShardingMessageExtractor[A, A] { - - override def shardId(entityId: String): String = (math.abs(entityId.hashCode) % maxNumberOfShards).toString - override final def unwrapMessage(message: A): A = message - - override def toString = s"HashCodeNoEnvelopeMessageExtractor($maxNumberOfShards)" -} - -/** - * INTERNAL API - * Extracts entityId and unwraps ShardingEnvelope and StartEntity messages. - * Other messages are delegated to the given `ShardingMessageExtractor`. - */ -@InternalApi private[akka] class ExtractorAdapter[E, A](delegate: ShardingMessageExtractor[E, A]) - extends ShardingMessageExtractor[Any, A] { - override def entityId(message: Any): String = { - message match { - case ShardingEnvelope(entityId, _) ⇒ entityId //also covers UntypedStartEntity in ShardingEnvelope - case UntypedStartEntity(entityId) ⇒ entityId - case msg: E @unchecked ⇒ delegate.entityId(msg) - } - } - - override def shardId(entityId: String): String = delegate.shardId(entityId) - - override def unwrapMessage(message: Any): A = { - message match { - case ShardingEnvelope(_, msg: A @unchecked) ⇒ - //also covers UntypedStartEntity in ShardingEnvelope - msg - case msg: UntypedStartEntity ⇒ - // not really of type A, but erased and StartEntity is only handled internally, not delivered to the entity - msg.asInstanceOf[A] - case msg: E @unchecked ⇒ - delegate.unwrapMessage(msg) - } - } - - override def handOffStopMessage: A = delegate.handOffStopMessage - - override def toString: String = delegate.toString -} - -/** - * The key of an entity type, the `name` must be unique. - * - * Not for user extension. - */ -@DoNotInherit abstract class EntityTypeKey[T] { - def name: String -} - -object EntityTypeKey { - /** - * Scala API: Creates an `EntityTypeKey`. The `name` must be unique. - */ - def apply[T](name: String)(implicit tTag: ClassTag[T]): EntityTypeKey[T] = - AdaptedClusterShardingImpl.EntityTypeKeyImpl(name, implicitly[ClassTag[T]].runtimeClass.getName) - - /** - * Java API: Creates an `EntityTypeKey`. The `name` must be unique. - */ - def create[T](messageClass: Class[T], name: String): EntityTypeKey[T] = - AdaptedClusterShardingImpl.EntityTypeKeyImpl(name, messageClass.getName) - -} - -object ClusterSharding extends ExtensionId[ClusterSharding] { - - override def createExtension(system: ActorSystem[_]): ClusterSharding = - new AdaptedClusterShardingImpl(system) - - /** Java API */ - def get(system: ActorSystem[_]): ClusterSharding = apply(system) -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] object AdaptedClusterShardingImpl { - final case class EntityTypeKeyImpl[T](name: String, messageClassName: String) extends EntityTypeKey[T] { - override def toString: String = s"EntityTypeKey[$messageClassName]($name)" - } -} - -/** INTERNAL API */ -@InternalApi -final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSharding { - - import akka.actor.typed.scaladsl.adapter._ - - require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted untyped actor systems can be used for cluster features") - - private val cluster = Cluster(system) - private val untypedSystem = system.toUntyped - private val untypedSharding = akka.cluster.sharding.ClusterSharding(untypedSystem) - private val log: LoggingAdapter = Logging(untypedSystem, classOf[ClusterSharding]) - - override def spawn[A]( - behavior: String ⇒ Behavior[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - maxNumberOfShards: Int, - handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = { - val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) - spawn2(behavior, entityProps, typeKey, settings, extractor, defaultShardAllocationStrategy(settings)) - } - - override def spawnJavadsl[A]( - behavior: EntityIdToBehavior[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - maxNumberOfShards: Int, - handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = { - val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) - spawnJavadsl(behavior, entityProps, typeKey, settings, extractor, defaultShardAllocationStrategy(settings)) - } - - override def spawn3[E, A]( - behavior: String ⇒ Behavior[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E] = - spawn2(behavior, entityProps, typeKey, settings, messageExtractor, defaultShardAllocationStrategy(settings)) - - override def spawnJavadsl[E, A]( - behavior: EntityIdToBehavior[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E] = - spawnJavadsl(behavior, entityProps, typeKey, settings, messageExtractor, defaultShardAllocationStrategy(settings)) - - override def spawn2[E, A]( - behavior: String ⇒ Behavior[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - extractor: ShardingMessageExtractor[E, A], - allocationStrategy: ShardAllocationStrategy): ActorRef[E] = { - - val untypedSettings = ClusterShardingSettings.toUntypedSettings(settings) - - val extractorAdapter = new ExtractorAdapter(extractor) - val extractEntityId: ShardRegion.ExtractEntityId = { - // TODO is it possible to avoid the double evaluation of entityId - case message if extractorAdapter.entityId(message) != null ⇒ - (extractorAdapter.entityId(message), extractorAdapter.unwrapMessage(message)) - } - val extractShardId: ShardRegion.ExtractShardId = { message ⇒ - extractorAdapter.entityId(message) match { - case null ⇒ null - case eid ⇒ extractorAdapter.shardId(eid) - } - } - - val ref = - if (settings.shouldHostShard(cluster)) { - log.info("Starting Shard Region [{}]...", typeKey.name) - - val untypedEntityPropsFactory: String ⇒ akka.actor.Props = { entityId ⇒ - behavior(entityId) match { - case u: UntypedBehavior[_] ⇒ u.untypedProps // PersistentBehavior - case b ⇒ PropsAdapter(b, entityProps) - } - } - - untypedSharding.internalStart( - typeKey.name, - untypedEntityPropsFactory, - untypedSettings, - extractEntityId, - extractShardId, - defaultShardAllocationStrategy(settings), - extractor.handOffStopMessage) - } else { - system.log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...") - - untypedSharding.startProxy( - typeKey.name, - settings.role, - dataCenter = None, // TODO what about the multi-dc value here? issue #23689 - extractEntityId, - extractShardId) - } - - ActorRefAdapter(ref) - } - - override def spawnJavadsl[E, A]( - behavior: EntityIdToBehavior[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - extractor: ShardingMessageExtractor[E, A], - allocationStrategy: ShardAllocationStrategy): ActorRef[E] = { - spawn2(entityId ⇒ behavior.apply(entityId), entityProps, typeKey, settings, extractor, allocationStrategy) - } - - override def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A] = { - new AdaptedEntityRefImpl[A](untypedSharding.shardRegion(typeKey.name), entityId) - } - - override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = { - val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold - val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance - new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance) - } - -} - -@FunctionalInterface -trait EntityIdToBehavior[A] { - def apply(entityId: String): Behavior[A] -} - -@DoNotInherit -sealed abstract class ClusterSharding extends Extension { - - /** - * Scala API: Spawn a shard region or a proxy depending on if the settings require role and if this node has - * such a role. - * - * Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the - * recipient actor. - * A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId - * [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy. - * - * @param behavior Create the behavior for an entity given a entityId - * @param typeKey A key that uniquely identifies the type of entity in this cluster - * @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced. - * @tparam A The type of command the entity accepts - */ - def spawn[A]( - behavior: String ⇒ Behavior[A], - props: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - maxNumberOfShards: Int, - handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] - - /** - * Java API: Spawn a shard region or a proxy depending on if the settings require role and if this node has - * such a role. - * - * Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the - * recipient actor. - * A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId - * [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy. - * - * @param behavior Create the behavior for an entity given a entityId - * @param typeKey A key that uniquely identifies the type of entity in this cluster - * @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced. - * @tparam A The type of command the entity accepts - */ - def spawnJavadsl[A]( // FIXME javadsl package - behavior: EntityIdToBehavior[A], - props: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - maxNumberOfShards: Int, - handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] - - /** - * Scala API: Spawn a shard region or a proxy depending on if the settings require role and if this node - * has such a role. - * - * @param behavior Create the behavior for an entity given a entityId - * @param typeKey A key that uniquely identifies the type of entity in this cluster - * @param entityProps Props to apply when starting an entity - * @param messageExtractor Extract entityId, shardId, and unwrap messages. - * @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards - * @tparam E A possible envelope around the message the entity accepts - * @tparam A The type of command the entity accepts - */ - def spawn2[E, A]( - behavior: String ⇒ Behavior[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - messageExtractor: ShardingMessageExtractor[E, A], - allocationStrategy: ShardAllocationStrategy): ActorRef[E] - - /** - * Java API: Spawn a shard region or a proxy depending on if the settings require role and if this node - * has such a role. - * - * @param behavior Create the behavior for an entity given a entityId - * @param typeKey A key that uniquely identifies the type of entity in this cluster - * @param entityProps Props to apply when starting an entity - * @param messageExtractor Extract entityId, shardId, and unwrap messages. - * @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards - * @tparam E A possible envelope around the message the entity accepts - * @tparam A The type of command the entity accepts - */ - def spawnJavadsl[E, A]( // FIXME javadsl package - behavior: EntityIdToBehavior[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - messageExtractor: ShardingMessageExtractor[E, A], - allocationStrategy: ShardAllocationStrategy): ActorRef[E] - - /** - * Scala API: Spawn a shard region or a proxy depending on if the settings require role and if this node - * has such a role. - * - * @param behavior Create the behavior for an entity given a entityId - * @param typeKey A key that uniquely identifies the type of entity in this cluster - * @param entityProps Props to apply when starting an entity - * @param messageExtractor Extract entityId, shardId, and unwrap messages. - * @tparam E A possible envelope around the message the entity accepts - * @tparam A The type of command the entity accepts - */ - def spawn3[E, A]( - behavior: String ⇒ Behavior[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E] - - /** - * Java API: Spawn a shard region or a proxy depending on if the settings require role and if this node - * has such a role. - * - * @param behavior Create the behavior for an entity given a entityId - * @param typeKey A key that uniquely identifies the type of entity in this cluster - * @param entityProps Props to apply when starting an entity - * @param messageExtractor Extract entityId, shardId, and unwrap messages. - * @tparam E A possible envelope around the message the entity accepts - * @tparam A The type of command the entity accepts - */ - def spawnJavadsl[E, A]( // FIXME javadsl package - behavior: EntityIdToBehavior[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E] - - /** - * Create an `ActorRef`-like reference to a specific sharded entity. - * Currently you have to correctly specify the type of messages the target can handle. - * - * Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the - * here provided `entityId`. - * - * FIXME a more typed version of this API will be explored in https://github.com/akka/akka/issues/23690 - * - * For in-depth documentation of its semantics, see [[EntityRef]]. - */ - def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A] - - /** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */ - def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy -} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/EntityRef.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/EntityRef.scala deleted file mode 100644 index aaab5a4b45..0000000000 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/EntityRef.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (C) 2017-2018 Lightbend Inc. - */ -package akka.cluster.sharding.typed - -import akka.actor.{ InternalActorRef, Scheduler } -import akka.annotation.InternalApi -import akka.pattern.{ AskTimeoutException, PromiseActorRef } -import akka.actor.typed.ActorRef -import akka.actor.typed.scaladsl.AskPattern -import akka.actor.typed.scaladsl.AskPattern.PromiseRef -import akka.util.Timeout - -import scala.concurrent.Future - -/** - * A reference to an sharded Entity, which allows `ActorRef`-like usage. - * - * An [[EntityRef]] is NOT an [[ActorRef]]–by design–in order to be explicit about the fact that the life-cycle - * of a sharded Entity is very different than a plain Actors. Most notably, this is shown by features of Entities - * such as re-balancing (an active Entity to a different node) or passivation. Both of which are aimed to be completely - * transparent to users of such Entity. In other words, if this were to be a plain ActorRef, it would be possible to - * apply DeathWatch to it, which in turn would then trigger when the sharded Actor stopped, breaking the illusion that - * Entity refs are "always there". Please note that while not encouraged, it is possible to expose an Actor's `self` - * [[ActorRef]] and watch it in case such notification is desired. - */ -trait EntityRef[A] { - - /** - * Send a message to the entity referenced by this EntityRef using *at-most-once* - * messaging semantics. - */ - def tell(msg: A): Unit - - /** - * Allows to "ask" the [[EntityRef]] for a reply. - * See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern - * - * Example usage: - * {{{ - * case class Request(msg: String, replyTo: ActorRef[Reply]) - * case class Reply(msg: String) - * - * implicit val timeout = Timeout(3.seconds) - * val target: EntityRef[Request] = ... - * val f: Future[Reply] = target ? (Request("hello", _)) - * }}} - * - * Please note that an implicit [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern. - */ - def ask[U](f: ActorRef[U] ⇒ A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] - -} - -object EntityRef { - implicit final class EntityRefOps[A](val ref: EntityRef[A]) extends AnyVal { - /** - * Send a message to the Actor referenced by this ActorRef using *at-most-once* - * messaging semantics. - */ - def !(msg: A): Unit = ref.tell(msg) - - /** - * Allows to "ask" the [[EntityRef]] for a reply. - * See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern - * - * Example usage: - * {{{ - * case class Request(msg: String, replyTo: ActorRef[Reply]) - * case class Reply(msg: String) - * - * implicit val timeout = Timeout(3.seconds) - * val target: EntityRef[Request] = ... - * val f: Future[Reply] = target ? (Request("hello", _)) - * }}} - * - * Please note that an implicit [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern. - */ - def ?[U](f: ActorRef[U] ⇒ A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = - ref.ask(f)(timeout, scheduler) - } -} - -@InternalApi -private[akka] final class AdaptedEntityRefImpl[A](shardRegion: akka.actor.ActorRef, entityId: String) extends EntityRef[A] { - - override def tell(msg: A): Unit = - shardRegion ! ShardingEnvelope(entityId, msg) - - override def ask[U](f: (ActorRef[U]) ⇒ A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = { - val p = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout) - val m = f(p.ref) - if (p.promiseRef ne null) p.promiseRef.messageClassName = m.getClass.getName - shardRegion ! ShardingEnvelope(entityId, m) - p.future - } - - /** Similar to [[akka.actor.typed.scaladsl.AskPattern.PromiseRef]] but for an [[EntityRef]] target. */ - @InternalApi - private final class EntityPromiseRef[U](untyped: InternalActorRef, timeout: Timeout) { - import akka.actor.typed.internal.{ adapter ⇒ adapt } - - // Note: _promiseRef mustn't have a type pattern, since it can be null - private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) = - if (untyped.isTerminated) - ( - adapt.ActorRefAdapter[U](untyped.provider.deadLetters), - Future.failed[U](new AskTimeoutException(s"Recipient[$untyped] had already been terminated.")), - null) - else if (timeout.duration.length <= 0) - ( - adapt.ActorRefAdapter[U](untyped.provider.deadLetters), - Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$untyped]")), - null - ) - else { - val a = PromiseActorRef(untyped.provider, timeout, untyped, "unknown") - val b = adapt.ActorRefAdapter[U](a) - (b, a.result.future.asInstanceOf[Future[U]], a) - } - - val ref: ActorRef[U] = _ref - val future: Future[U] = _future - val promiseRef: PromiseActorRef = _promiseRef - } - -} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala new file mode 100644 index 0000000000..f7bdffbd23 --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ +package akka.cluster.sharding.typed + +object ShardingMessageExtractor { + + /** + * Scala API: + * + * Create the default message extractor, using envelopes to identify what entity a message is for + * and the hashcode of the entityId to decide which shard an entity belongs to. + * + * This is recommended since it does not force details about sharding into the entity protocol + */ + def apply[A](maxNumberOfShards: Int, handOffStopMessage: A): ShardingMessageExtractor[ShardingEnvelope[A], A] = + new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) + + /** + * Scala API: Create a message extractor for a protocol where the entity id is available in each message. + */ + def noEnvelope[A]( + maxNumberOfShards: Int, + handOffStopMessage: A)( + extractEntityId: A ⇒ String): ShardingMessageExtractor[A, A] = + new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) { + def entityId(message: A) = extractEntityId(message) + } + +} + +/** + * Entirely customizable typed message extractor. Prefer [[HashCodeMessageExtractor]] or + * [[HashCodeNoEnvelopeMessageExtractor]]if possible. + * + * @tparam E Possibly an Envelope around the messages accepted by the entity actor, is the same as `A` if there is no + * envelope. + * @tparam A The type of message accepted by the entity actor + */ +abstract class ShardingMessageExtractor[E, A] { + + /** + * Extract the entity id from an incoming `message`. If `null` is returned + * the message will be `unhandled`, i.e. posted as `Unhandled` messages on the event stream + */ + def entityId(message: E): String + + /** + * The shard identifier for a given entity id. Only messages that passed the [[ShardingMessageExtractor#entityId]] + * function will be used as input to this function. + */ + def shardId(entityId: String): String + + /** + * Extract the message to send to the entity from an incoming `message`. + * Note that the extracted message does not have to be the same as the incoming + * message to support wrapping in message envelope that is unwrapped before + * sending to the entity actor. + */ + def unwrapMessage(message: E): A + + /** + * Message sent to an entity to tell it to stop, e.g. when rebalanced. + * The message defined here is not passed to `entityId`, `shardId` or `unwrapMessage`. + */ + def handOffStopMessage: A +} + +/** + * Default message extractor type, using envelopes to identify what entity a message is for + * and the hashcode of the entityId to decide which shard an entity belongs to. + * + * This is recommended since it does not force details about sharding into the entity protocol + * + * @tparam A The type of message accepted by the entity actor + */ +final class HashCodeMessageExtractor[A]( + val maxNumberOfShards: Int, + override val handOffStopMessage: A) + extends ShardingMessageExtractor[ShardingEnvelope[A], A] { + + override def entityId(envelope: ShardingEnvelope[A]): String = envelope.entityId + override def shardId(entityId: String): String = (math.abs(entityId.hashCode) % maxNumberOfShards).toString + override def unwrapMessage(envelope: ShardingEnvelope[A]): A = envelope.message +} + +/** + * Default message extractor type, using a property of the message to identify what entity a message is for + * and the hashcode of the entityId to decide which shard an entity belongs to. + * + * This is recommended since it does not force details about sharding into the entity protocol + * + * @tparam A The type of message accepted by the entity actor + */ +abstract class HashCodeNoEnvelopeMessageExtractor[A]( + val maxNumberOfShards: Int, + override val handOffStopMessage: A) + extends ShardingMessageExtractor[A, A] { + + override def shardId(entityId: String): String = (math.abs(entityId.hashCode) % maxNumberOfShards).toString + override final def unwrapMessage(message: A): A = message + + override def toString = s"HashCodeNoEnvelopeMessageExtractor($maxNumberOfShards)" +} + +/** + * Default envelope type that may be used with Cluster Sharding. + * + * Cluster Sharding provides a default [[HashCodeMessageExtractor]] that is able to handle + * these types of messages, by hashing the entityId into into the shardId. It is not the only, + * but a convenient way to send envelope-wrapped messages via cluster sharding. + * + * The alternative way of routing messages through sharding is to not use envelopes, + * and have the message types themselves carry identifiers. + */ +final case class ShardingEnvelope[A](entityId: String, message: A) // TODO think if should remain a case class + diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala new file mode 100644 index 0000000000..60d14160b0 --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -0,0 +1,248 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ +package akka.cluster.sharding.typed +package internal + +import java.util.Optional +import java.util.concurrent.CompletionStage + +import scala.compat.java8.OptionConverters._ +import scala.compat.java8.FutureConverters._ +import scala.concurrent.Future + +import akka.actor.InternalActorRef +import akka.actor.Scheduler +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.Behavior.UntypedBehavior +import akka.actor.typed.Props +import akka.actor.typed.internal.adapter.ActorRefAdapter +import akka.actor.typed.internal.adapter.ActorSystemAdapter +import akka.annotation.InternalApi +import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import akka.cluster.sharding.ShardRegion +import akka.cluster.sharding.ShardRegion.{ StartEntity ⇒ UntypedStartEntity } +import akka.cluster.sharding.typed.javadsl.EntityIdToBehavior +import akka.cluster.typed.Cluster +import akka.event.Logging +import akka.event.LoggingAdapter +import akka.pattern.AskTimeoutException +import akka.pattern.PromiseActorRef +import akka.util.Timeout +import akka.japi.function.{ Function ⇒ JFunction } + +/** + * INTERNAL API + * Extracts entityId and unwraps ShardingEnvelope and StartEntity messages. + * Other messages are delegated to the given `ShardingMessageExtractor`. + */ +@InternalApi private[akka] class ExtractorAdapter[E, A](delegate: ShardingMessageExtractor[E, A]) + extends ShardingMessageExtractor[Any, A] { + override def entityId(message: Any): String = { + message match { + case ShardingEnvelope(entityId, _) ⇒ entityId //also covers UntypedStartEntity in ShardingEnvelope + case UntypedStartEntity(entityId) ⇒ entityId + case msg: E @unchecked ⇒ delegate.entityId(msg) + } + } + + override def shardId(entityId: String): String = delegate.shardId(entityId) + + override def unwrapMessage(message: Any): A = { + message match { + case ShardingEnvelope(_, msg: A @unchecked) ⇒ + //also covers UntypedStartEntity in ShardingEnvelope + msg + case msg: UntypedStartEntity ⇒ + // not really of type A, but erased and StartEntity is only handled internally, not delivered to the entity + msg.asInstanceOf[A] + case msg: E @unchecked ⇒ + delegate.unwrapMessage(msg) + } + } + + override def handOffStopMessage: A = delegate.handOffStopMessage + + override def toString: String = delegate.toString +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class EntityTypeKeyImpl[T](name: String, messageClassName: String) + extends javadsl.EntityTypeKey[T] with scaladsl.EntityTypeKey[T] { + override def toString: String = s"EntityTypeKey[$messageClassName]($name)" +} + +/** INTERNAL API */ +@InternalApi private[akka] final class ClusterShardingImpl(system: ActorSystem[_]) + extends javadsl.ClusterSharding with scaladsl.ClusterSharding { + + import akka.actor.typed.scaladsl.adapter._ + + require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted untyped actor systems can be used for cluster features") + + private val cluster = Cluster(system) + private val untypedSystem = system.toUntyped + private val untypedSharding = akka.cluster.sharding.ClusterSharding(untypedSystem) + private val log: LoggingAdapter = Logging(untypedSystem, classOf[scaladsl.ClusterSharding]) + + override def spawn[A]( + behavior: String ⇒ Behavior[A], + entityProps: Props, + typeKey: scaladsl.EntityTypeKey[A], + settings: ClusterShardingSettings, + maxNumberOfShards: Int, + handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = { + val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) + spawnWithMessageExtractor(behavior, entityProps, typeKey, settings, extractor, Some(defaultShardAllocationStrategy(settings))) + } + + override def spawn[A]( + behavior: EntityIdToBehavior[A], + entityProps: Props, + typeKey: javadsl.EntityTypeKey[A], + settings: ClusterShardingSettings, + maxNumberOfShards: Int, + handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = { + val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) + spawnWithMessageExtractor(behavior, entityProps, typeKey, settings, extractor, + Optional.of(defaultShardAllocationStrategy(settings))) + } + + override def spawnWithMessageExtractor[E, A]( + behavior: String ⇒ Behavior[A], + entityProps: Props, + typeKey: scaladsl.EntityTypeKey[A], + settings: ClusterShardingSettings, + extractor: ShardingMessageExtractor[E, A], + allocationStrategy: Option[ShardAllocationStrategy]): ActorRef[E] = { + + val untypedSettings = ClusterShardingSettings.toUntypedSettings(settings) + + val extractorAdapter = new ExtractorAdapter(extractor) + val extractEntityId: ShardRegion.ExtractEntityId = { + // TODO is it possible to avoid the double evaluation of entityId + case message if extractorAdapter.entityId(message) != null ⇒ + (extractorAdapter.entityId(message), extractorAdapter.unwrapMessage(message)) + } + val extractShardId: ShardRegion.ExtractShardId = { message ⇒ + extractorAdapter.entityId(message) match { + case null ⇒ null + case eid ⇒ extractorAdapter.shardId(eid) + } + } + + val ref = + if (settings.shouldHostShard(cluster)) { + log.info("Starting Shard Region [{}]...", typeKey.name) + + val untypedEntityPropsFactory: String ⇒ akka.actor.Props = { entityId ⇒ + behavior(entityId) match { + case u: UntypedBehavior[_] ⇒ u.untypedProps // PersistentBehavior + case b ⇒ PropsAdapter(b, entityProps) + } + } + + untypedSharding.internalStart( + typeKey.name, + untypedEntityPropsFactory, + untypedSettings, + extractEntityId, + extractShardId, + defaultShardAllocationStrategy(settings), + extractor.handOffStopMessage) + } else { + system.log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...") + + untypedSharding.startProxy( + typeKey.name, + settings.role, + dataCenter = None, // TODO what about the multi-dc value here? issue #23689 + extractEntityId, + extractShardId) + } + + ActorRefAdapter(ref) + } + + override def spawnWithMessageExtractor[E, A]( + behavior: EntityIdToBehavior[A], + entityProps: Props, + typeKey: javadsl.EntityTypeKey[A], + settings: ClusterShardingSettings, + extractor: ShardingMessageExtractor[E, A], + allocationStrategy: Optional[ShardAllocationStrategy]): ActorRef[E] = { + spawnWithMessageExtractor(entityId ⇒ behavior.apply(entityId), entityProps, typeKey.asScala, + settings, extractor, allocationStrategy.asScala) + } + + override def entityRefFor[A](typeKey: scaladsl.EntityTypeKey[A], entityId: String): scaladsl.EntityRef[A] = { + new EntityRefImpl[A](untypedSharding.shardRegion(typeKey.name), entityId) + } + + override def entityRefFor[A](typeKey: javadsl.EntityTypeKey[A], entityId: String): javadsl.EntityRef[A] = { + new EntityRefImpl[A](untypedSharding.shardRegion(typeKey.name), entityId) + } + + override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = { + val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold + val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance + new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance) + } + +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class EntityRefImpl[A](shardRegion: akka.actor.ActorRef, entityId: String) + extends javadsl.EntityRef[A] with scaladsl.EntityRef[A] { + + override def tell(msg: A): Unit = + shardRegion ! ShardingEnvelope(entityId, msg) + + override def ask[U](message: (ActorRef[U]) ⇒ A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = { + val replyTo = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout) + val m = message(replyTo.ref) + if (replyTo.promiseRef ne null) replyTo.promiseRef.messageClassName = m.getClass.getName + shardRegion ! ShardingEnvelope(entityId, m) + replyTo.future + } + + def ask[U](message: JFunction[ActorRef[U], A], timeout: Timeout, scheduler: Scheduler): CompletionStage[U] = + ask[U](replyTo ⇒ message.apply(replyTo))(timeout, scheduler).toJava + + /** Similar to [[akka.actor.typed.scaladsl.AskPattern.PromiseRef]] but for an `EntityRef` target. */ + @InternalApi + private final class EntityPromiseRef[U](untyped: InternalActorRef, timeout: Timeout) { + import akka.actor.typed.internal.{ adapter ⇒ adapt } + + // Note: _promiseRef mustn't have a type pattern, since it can be null + private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) = + if (untyped.isTerminated) + ( + adapt.ActorRefAdapter[U](untyped.provider.deadLetters), + Future.failed[U](new AskTimeoutException(s"Recipient[$untyped] had already been terminated.")), + null) + else if (timeout.duration.length <= 0) + ( + adapt.ActorRefAdapter[U](untyped.provider.deadLetters), + Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$untyped]")), + null + ) + else { + val a = PromiseActorRef(untyped.provider, timeout, untyped, "unknown") + val b = adapt.ActorRefAdapter[U](a) + (b, a.result.future.asInstanceOf[Future[U]], a) + } + + val ref: ActorRef[U] = _ref + val future: Future[U] = _future + val promiseRef: PromiseActorRef = _promiseRef + } + +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingSerializer.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingSerializer.scala index bebc147690..e288b878ac 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingSerializer.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingSerializer.scala @@ -5,11 +5,12 @@ package akka.cluster.sharding.typed.internal import java.io.NotSerializableException -import akka.cluster.sharding.typed.internal.protobuf.ShardingMessages import akka.annotation.InternalApi +import akka.cluster.sharding.typed.ShardingEnvelope +import akka.cluster.sharding.typed.internal.protobuf.ShardingMessages import akka.remote.serialization.WrappedPayloadSupport -import akka.serialization.{ BaseSerializer, SerializerWithStringManifest } -import akka.cluster.sharding.typed.{ ShardingEnvelope, StartEntity } +import akka.serialization.BaseSerializer +import akka.serialization.SerializerWithStringManifest /** * INTERNAL API @@ -31,9 +32,7 @@ import akka.cluster.sharding.typed.{ ShardingEnvelope, StartEntity } case env: ShardingEnvelope[_] ⇒ val builder = ShardingMessages.ShardingEnvelope.newBuilder() builder.setEntityId(env.entityId) - // special null for StartEntity, might change with issue #23679 - if (env.message != null) - builder.setMessage(payloadSupport.payloadBuilder(env.message)) + builder.setMessage(payloadSupport.payloadBuilder(env.message)) builder.build().toByteArray() case _ ⇒ @@ -44,13 +43,8 @@ import akka.cluster.sharding.typed.{ ShardingEnvelope, StartEntity } case ShardingEnvelopeManifest ⇒ val env = ShardingMessages.ShardingEnvelope.parseFrom(bytes) val entityId = env.getEntityId - if (env.hasMessage) { - val wrappedMsg = payloadSupport.deserializePayload(env.getMessage) - ShardingEnvelope(entityId, wrappedMsg) - } else { - // special for StartEntity, might change with issue #23679 - StartEntity(entityId) - } + val wrappedMsg = payloadSupport.deserializePayload(env.getMessage) + ShardingEnvelope(entityId, wrappedMsg) case _ ⇒ throw new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala new file mode 100644 index 0000000000..53ed19061b --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -0,0 +1,270 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ +package akka.cluster.sharding.typed +package javadsl + +import java.util.Optional +import java.util.concurrent.CompletionStage + +import akka.actor.Scheduler +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.Props +import akka.annotation.DoNotInherit +import akka.annotation.InternalApi +import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl +import akka.japi.function.{ Function ⇒ JFunction } +import akka.util.Timeout + +@FunctionalInterface +trait EntityIdToBehavior[A] { + def apply(entityId: String): Behavior[A] +} + +object ClusterSharding { + def get(system: ActorSystem[_]): ClusterSharding = + scaladsl.ClusterSharding(system).asJava +} + +/** + * This extension provides sharding functionality of actors in a cluster. + * The typical use case is when you have many stateful actors that together consume + * more resources (e.g. memory) than fit on one machine. You need to distribute them across + * several nodes in the cluster and you want to be able to interact with them using their + * logical identifier, but without having to care about their physical location in the cluster, + * which might also change over time. It could for example be actors representing Aggregate Roots in + * Domain-Driven Design terminology. Here we call these actors "entities". These actors + * typically have persistent (durable) state, but this feature is not limited to + * actors with persistent state. + * + * In this context sharding means that actors with an identifier, so called entities, + * can be automatically distributed across multiple nodes in the cluster. Each entity + * actor runs only at one place, and messages can be sent to the entity without requiring + * the sender to know the location of the destination actor. This is achieved by sending + * the messages via a `ShardRegion` actor provided by this extension, which knows how + * to route the message with the entity id to the final destination. + * + * This extension is supposed to be used by first, typically at system startup on each node + * in the cluster, registering the supported entity types with the [[ClusterSharding#spawn]] + * method, which returns the `ShardRegion` actor reference for a named entity type. + * Messages to the entities are always sent via that `ActorRef`, i.e. the local `ShardRegion`. + * Messages can also be sent via the [[EntityRef]] retrieved with [[ClusterSharding#entityRefFor]], + * which will also send via the local `ShardRegion`. + * + * Some settings can be configured as described in the `akka.cluster.sharding` + * section of the `reference.conf`. + * + * The `ShardRegion` actor is started on each node in the cluster, or group of nodes + * tagged with a specific role. The `ShardRegion` is created with a [[ShardingMessageExtractor]] + * to extract the entity identifier and the shard identifier from incoming messages. + * A shard is a group of entities that will be managed together. For the first message in a + * specific shard the `ShardRegion` requests the location of the shard from a central coordinator, + * the [[ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion` + * owns the shard. The `ShardRegion` receives the decided home of the shard + * and if that is the `ShardRegion` instance itself it will create a local child + * actor representing the entity and direct all messages for that entity to it. + * If the shard home is another `ShardRegion` instance messages will be forwarded + * to that `ShardRegion` instance instead. While resolving the location of a + * shard incoming messages for that shard are buffered and later delivered when the + * shard location is known. Subsequent messages to the resolved shard can be delivered + * to the target destination immediately without involving the `ShardCoordinator`. + * + * To make sure that at most one instance of a specific entity actor is running somewhere + * in the cluster it is important that all nodes have the same view of where the shards + * are located. Therefore the shard allocation decisions are taken by the central + * `ShardCoordinator`, which is running as a cluster singleton, i.e. one instance on + * the oldest member among all cluster nodes or a group of nodes tagged with a specific + * role. The oldest member can be determined by [[akka.cluster.Member#isOlderThan]]. + * + * To be able to use newly added members in the cluster the coordinator facilitates rebalancing + * of shards, i.e. migrate entities from one node to another. In the rebalance process the + * coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started. + * That means they will start buffering incoming messages for that shard, in the same way as if the + * shard location is unknown. During the rebalance process the coordinator will not answer any + * requests for the location of shards that are being rebalanced, i.e. local buffering will + * continue until the handoff is completed. The `ShardRegion` responsible for the rebalanced shard + * will stop all entities in that shard by sending the `handOffMessage` to them. When all entities have + * been terminated the `ShardRegion` owning the entities will acknowledge the handoff as completed + * to the coordinator. Thereafter the coordinator will reply to requests for the location of + * the shard and thereby allocate a new home for the shard and then buffered messages in the + * `ShardRegion` actors are delivered to the new location. This means that the state of the entities + * are not transferred or migrated. If the state of the entities are of importance it should be + * persistent (durable), e.g. with `akka-persistence`, so that it can be recovered at the new + * location. + * + * The logic that decides which shards to rebalance is defined in a plugable shard + * allocation strategy. The default implementation [[LeastShardAllocationStrategy]] + * picks shards for handoff from the `ShardRegion` with most number of previously allocated shards. + * They will then be allocated to the `ShardRegion` with least number of previously allocated shards, + * i.e. new members in the cluster. There is a configurable threshold of how large the difference + * must be to begin the rebalancing. This strategy can be replaced by an application specific + * implementation. + * + * The state of shard locations in the `ShardCoordinator` is stored with `akka-distributed-data` or + * `akka-persistence` to survive failures. When a crashed or unreachable coordinator + * node has been removed (via down) from the cluster a new `ShardCoordinator` singleton + * actor will take over and the state is recovered. During such a failure period shards + * with known location are still available, while messages for new (unknown) shards + * are buffered until the new `ShardCoordinator` becomes available. + * + * As long as a sender uses the same `ShardRegion` actor to deliver messages to an entity + * actor the order of the messages is preserved. As long as the buffer limit is not reached + * messages are delivered on a best effort basis, with at-most once delivery semantics, + * in the same way as ordinary message sending. Reliable end-to-end messaging, with + * at-least-once semantics can be added by using `AtLeastOnceDelivery` in `akka-persistence`. + * + * Some additional latency is introduced for messages targeted to new or previously + * unused shards due to the round-trip to the coordinator. Rebalancing of shards may + * also add latency. This should be considered when designing the application specific + * shard resolution, e.g. to avoid too fine grained shards. + * + * The `ShardRegion` actor can also be started in proxy only mode, i.e. it will not + * host any entities itself, but knows how to delegate messages to the right location. + * + * If the state of the entities are persistent you may stop entities that are not used to + * reduce memory consumption. This is done by the application specific implementation of + * the entity actors for example by defining receive timeout (`context.setReceiveTimeout`). + * If a message is already enqueued to the entity when it stops itself the enqueued message + * in the mailbox will be dropped. To support graceful passivation without losing such + * messages the entity actor can send [[ShardRegion.Passivate]] to its parent `ShardRegion`. + * The specified wrapped message in `Passivate` will be sent back to the entity, which is + * then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion` + * between reception of `Passivate` and termination of the entity. Such buffered messages + * are thereafter delivered to a new incarnation of the entity. + * + */ +@DoNotInherit +abstract class ClusterSharding { + + /** + * Spawn a shard region or a proxy depending on if the settings require role and if this node has + * such a role. + * + * Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the + * recipient actor. + * A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId + * [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy. + * + * @param behavior Create the behavior for an entity given a entityId + * @param typeKey A key that uniquely identifies the type of entity in this cluster + * @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced. + * @tparam A The type of command the entity accepts + */ + def spawn[A]( + behavior: EntityIdToBehavior[A], + props: Props, + typeKey: EntityTypeKey[A], + settings: ClusterShardingSettings, + maxNumberOfShards: Int, + handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] + + /** + * Spawn a shard region or a proxy depending on if the settings require role and if this node + * has such a role. + * + * @param behavior Create the behavior for an entity given a entityId + * @param typeKey A key that uniquely identifies the type of entity in this cluster + * @param entityProps Props to apply when starting an entity + * @param messageExtractor Extract entityId, shardId, and unwrap messages. + * @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards, + * [[ClusterSharding#defaultShardAllocationStrategy]] is used if `Optional.empty` + * @tparam E A possible envelope around the message the entity accepts + * @tparam A The type of command the entity accepts + */ + def spawnWithMessageExtractor[E, A]( + behavior: EntityIdToBehavior[A], + entityProps: Props, + typeKey: EntityTypeKey[A], + settings: ClusterShardingSettings, + messageExtractor: ShardingMessageExtractor[E, A], + allocationStrategy: Optional[ShardAllocationStrategy]): ActorRef[E] + + /** + * Create an `ActorRef`-like reference to a specific sharded entity. + * Currently you have to correctly specify the type of messages the target can handle. + * + * Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the + * here provided `entityId`. + * + * For in-depth documentation of its semantics, see [[EntityRef]]. + */ + def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A] + + /** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */ + def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy +} + +/** Allows starting a specific Sharded Entity by its entity identifier */ +object StartEntity { + + /** + * Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the + * specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it. + */ + def create[A](msgClass: Class[A], entityId: String): ShardingEnvelope[A] = + scaladsl.StartEntity[A](entityId) +} + +/** + * The key of an entity type, the `name` must be unique. + * + * Not for user extension. + */ +@DoNotInherit abstract class EntityTypeKey[T] { scaladslSelf: scaladsl.EntityTypeKey[T] ⇒ + def name: String + + /** + * INTERNAL API + */ + @InternalApi private[akka] def asScala: scaladsl.EntityTypeKey[T] = scaladslSelf +} + +object EntityTypeKey { + + /** + * Creates an `EntityTypeKey`. The `name` must be unique. + */ + def create[T](messageClass: Class[T], name: String): EntityTypeKey[T] = + EntityTypeKeyImpl(name, messageClass.getName) + +} + +/** + * A reference to an sharded Entity, which allows `ActorRef`-like usage. + * + * An [[EntityRef]] is NOT an [[ActorRef]]–by design–in order to be explicit about the fact that the life-cycle + * of a sharded Entity is very different than a plain Actors. Most notably, this is shown by features of Entities + * such as re-balancing (an active Entity to a different node) or passivation. Both of which are aimed to be completely + * transparent to users of such Entity. In other words, if this were to be a plain ActorRef, it would be possible to + * apply DeathWatch to it, which in turn would then trigger when the sharded Actor stopped, breaking the illusion that + * Entity refs are "always there". Please note that while not encouraged, it is possible to expose an Actor's `self` + * [[ActorRef]] and watch it in case such notification is desired. + * + * Not for user extension. + */ +@DoNotInherit abstract class EntityRef[A] { scaladslSelf: scaladsl.EntityRef[A] ⇒ + + /** + * Send a message to the entity referenced by this EntityRef using *at-most-once* + * messaging semantics. + */ + def tell(msg: A): Unit + + /** + * Allows to "ask" the [[EntityRef]] for a reply. + * See [[akka.actor.typed.javadsl.AskPattern]] for a complete write-up of this pattern + * + * Please note that a [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern. + */ + def ask[U](message: JFunction[ActorRef[U], A], timeout: Timeout, scheduler: Scheduler): CompletionStage[U] + + /** + * INTERNAL API + */ + @InternalApi private[akka] def asScala: scaladsl.EntityRef[A] = scaladslSelf + +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala new file mode 100644 index 0000000000..fd774ee445 --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -0,0 +1,307 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ +package akka.cluster.sharding.typed +package scaladsl + +import scala.concurrent.Future + +import akka.actor.Scheduler +import akka.util.Timeout + +import scala.reflect.ClassTag + +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.Extension +import akka.actor.typed.ExtensionId +import akka.actor.typed.Props +import akka.annotation.DoNotInherit +import akka.annotation.InternalApi +import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import akka.cluster.sharding.typed.internal.ClusterShardingImpl +import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl +import akka.cluster.sharding.ShardRegion.{ StartEntity ⇒ UntypedStartEntity } + +object ClusterSharding extends ExtensionId[ClusterSharding] { + + override def createExtension(system: ActorSystem[_]): ClusterSharding = + new ClusterShardingImpl(system) + +} + +/** + * This extension provides sharding functionality of actors in a cluster. + * The typical use case is when you have many stateful actors that together consume + * more resources (e.g. memory) than fit on one machine. You need to distribute them across + * several nodes in the cluster and you want to be able to interact with them using their + * logical identifier, but without having to care about their physical location in the cluster, + * which might also change over time. It could for example be actors representing Aggregate Roots in + * Domain-Driven Design terminology. Here we call these actors "entities". These actors + * typically have persistent (durable) state, but this feature is not limited to + * actors with persistent state. + * + * In this context sharding means that actors with an identifier, so called entities, + * can be automatically distributed across multiple nodes in the cluster. Each entity + * actor runs only at one place, and messages can be sent to the entity without requiring + * the sender to know the location of the destination actor. This is achieved by sending + * the messages via a `ShardRegion` actor provided by this extension, which knows how + * to route the message with the entity id to the final destination. + * + * This extension is supposed to be used by first, typically at system startup on each node + * in the cluster, registering the supported entity types with the [[ClusterSharding#spawn]] + * method, which returns the `ShardRegion` actor reference for a named entity type. + * Messages to the entities are always sent via that `ActorRef`, i.e. the local `ShardRegion`. + * Messages can also be sent via the [[EntityRef]] retrieved with [[ClusterSharding#entityRefFor]], + * which will also send via the local `ShardRegion`. + * + * Some settings can be configured as described in the `akka.cluster.sharding` + * section of the `reference.conf`. + * + * The `ShardRegion` actor is started on each node in the cluster, or group of nodes + * tagged with a specific role. The `ShardRegion` is created with a [[ShardingMessageExtractor]] + * to extract the entity identifier and the shard identifier from incoming messages. + * A shard is a group of entities that will be managed together. For the first message in a + * specific shard the `ShardRegion` requests the location of the shard from a central coordinator, + * the [[ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion` + * owns the shard. The `ShardRegion` receives the decided home of the shard + * and if that is the `ShardRegion` instance itself it will create a local child + * actor representing the entity and direct all messages for that entity to it. + * If the shard home is another `ShardRegion` instance messages will be forwarded + * to that `ShardRegion` instance instead. While resolving the location of a + * shard incoming messages for that shard are buffered and later delivered when the + * shard location is known. Subsequent messages to the resolved shard can be delivered + * to the target destination immediately without involving the `ShardCoordinator`. + * + * To make sure that at most one instance of a specific entity actor is running somewhere + * in the cluster it is important that all nodes have the same view of where the shards + * are located. Therefore the shard allocation decisions are taken by the central + * `ShardCoordinator`, which is running as a cluster singleton, i.e. one instance on + * the oldest member among all cluster nodes or a group of nodes tagged with a specific + * role. The oldest member can be determined by [[akka.cluster.Member#isOlderThan]]. + * + * To be able to use newly added members in the cluster the coordinator facilitates rebalancing + * of shards, i.e. migrate entities from one node to another. In the rebalance process the + * coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started. + * That means they will start buffering incoming messages for that shard, in the same way as if the + * shard location is unknown. During the rebalance process the coordinator will not answer any + * requests for the location of shards that are being rebalanced, i.e. local buffering will + * continue until the handoff is completed. The `ShardRegion` responsible for the rebalanced shard + * will stop all entities in that shard by sending the `handOffMessage` to them. When all entities have + * been terminated the `ShardRegion` owning the entities will acknowledge the handoff as completed + * to the coordinator. Thereafter the coordinator will reply to requests for the location of + * the shard and thereby allocate a new home for the shard and then buffered messages in the + * `ShardRegion` actors are delivered to the new location. This means that the state of the entities + * are not transferred or migrated. If the state of the entities are of importance it should be + * persistent (durable), e.g. with `akka-persistence`, so that it can be recovered at the new + * location. + * + * The logic that decides which shards to rebalance is defined in a plugable shard + * allocation strategy. The default implementation [[LeastShardAllocationStrategy]] + * picks shards for handoff from the `ShardRegion` with most number of previously allocated shards. + * They will then be allocated to the `ShardRegion` with least number of previously allocated shards, + * i.e. new members in the cluster. There is a configurable threshold of how large the difference + * must be to begin the rebalancing. This strategy can be replaced by an application specific + * implementation. + * + * The state of shard locations in the `ShardCoordinator` is stored with `akka-distributed-data` or + * `akka-persistence` to survive failures. When a crashed or unreachable coordinator + * node has been removed (via down) from the cluster a new `ShardCoordinator` singleton + * actor will take over and the state is recovered. During such a failure period shards + * with known location are still available, while messages for new (unknown) shards + * are buffered until the new `ShardCoordinator` becomes available. + * + * As long as a sender uses the same `ShardRegion` actor to deliver messages to an entity + * actor the order of the messages is preserved. As long as the buffer limit is not reached + * messages are delivered on a best effort basis, with at-most once delivery semantics, + * in the same way as ordinary message sending. Reliable end-to-end messaging, with + * at-least-once semantics can be added by using `AtLeastOnceDelivery` in `akka-persistence`. + * + * Some additional latency is introduced for messages targeted to new or previously + * unused shards due to the round-trip to the coordinator. Rebalancing of shards may + * also add latency. This should be considered when designing the application specific + * shard resolution, e.g. to avoid too fine grained shards. + * + * The `ShardRegion` actor can also be started in proxy only mode, i.e. it will not + * host any entities itself, but knows how to delegate messages to the right location. + * + * If the state of the entities are persistent you may stop entities that are not used to + * reduce memory consumption. This is done by the application specific implementation of + * the entity actors for example by defining receive timeout (`context.setReceiveTimeout`). + * If a message is already enqueued to the entity when it stops itself the enqueued message + * in the mailbox will be dropped. To support graceful passivation without losing such + * messages the entity actor can send [[ShardRegion.Passivate]] to its parent `ShardRegion`. + * The specified wrapped message in `Passivate` will be sent back to the entity, which is + * then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion` + * between reception of `Passivate` and termination of the entity. Such buffered messages + * are thereafter delivered to a new incarnation of the entity. + * + */ +@DoNotInherit +trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding ⇒ + + /** + * Spawn a shard region or a proxy depending on if the settings require role and if this node has + * such a role. + * + * Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the + * recipient actor. + * A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId + * [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy. + * + * @param behavior Create the behavior for an entity given a entityId + * @param typeKey A key that uniquely identifies the type of entity in this cluster + * @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced. + * @tparam A The type of command the entity accepts + */ + def spawn[A]( + behavior: String ⇒ Behavior[A], + props: Props, + typeKey: EntityTypeKey[A], + settings: ClusterShardingSettings, + maxNumberOfShards: Int, + handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] + + /** + * Spawn a shard region or a proxy depending on if the settings require role and if this node + * has such a role. + * + * @param behavior Create the behavior for an entity given a entityId + * @param typeKey A key that uniquely identifies the type of entity in this cluster + * @param entityProps Props to apply when starting an entity + * @param messageExtractor Extract entityId, shardId, and unwrap messages. + * @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards, + * [[ClusterSharding#defaultShardAllocationStrategy]] is used if `None` + * @tparam E A possible envelope around the message the entity accepts + * @tparam A The type of command the entity accepts + */ + def spawnWithMessageExtractor[E, A]( + behavior: String ⇒ Behavior[A], + entityProps: Props, + typeKey: EntityTypeKey[A], + settings: ClusterShardingSettings, + messageExtractor: ShardingMessageExtractor[E, A], + allocationStrategy: Option[ShardAllocationStrategy]): ActorRef[E] + + /** + * Create an `ActorRef`-like reference to a specific sharded entity. + * Currently you have to correctly specify the type of messages the target can handle. + * + * Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the + * here provided `entityId`. + * + * For in-depth documentation of its semantics, see [[EntityRef]]. + */ + def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A] + + /** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */ + def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy + + /** + * INTERNAL API + */ + @InternalApi private[akka] def asJava: javadsl.ClusterSharding = javadslSelf +} + +/** Allows starting a specific Sharded Entity by its entity identifier */ +object StartEntity { + + /** + * Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the + * specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it. + */ + def apply[A](entityId: String): ShardingEnvelope[A] = { + // StartEntity isn't really of type A, but erased and StartEntity is only handled internally, not delivered to the entity + new ShardingEnvelope[A](entityId, UntypedStartEntity(entityId).asInstanceOf[A]) + } +} + +/** + * The key of an entity type, the `name` must be unique. + * + * Not for user extension. + */ +@DoNotInherit trait EntityTypeKey[T] { + def name: String +} + +object EntityTypeKey { + /** + * Creates an `EntityTypeKey`. The `name` must be unique. + */ + def apply[T](name: String)(implicit tTag: ClassTag[T]): EntityTypeKey[T] = + EntityTypeKeyImpl(name, implicitly[ClassTag[T]].runtimeClass.getName) + +} + +/** + * A reference to an sharded Entity, which allows `ActorRef`-like usage. + * + * An [[EntityRef]] is NOT an [[ActorRef]]–by design–in order to be explicit about the fact that the life-cycle + * of a sharded Entity is very different than a plain Actors. Most notably, this is shown by features of Entities + * such as re-balancing (an active Entity to a different node) or passivation. Both of which are aimed to be completely + * transparent to users of such Entity. In other words, if this were to be a plain ActorRef, it would be possible to + * apply DeathWatch to it, which in turn would then trigger when the sharded Actor stopped, breaking the illusion that + * Entity refs are "always there". Please note that while not encouraged, it is possible to expose an Actor's `self` + * [[ActorRef]] and watch it in case such notification is desired. + * Not for user extension. + */ +@DoNotInherit trait EntityRef[A] { + + /** + * Send a message to the entity referenced by this EntityRef using *at-most-once* + * messaging semantics. + */ + def tell(msg: A): Unit + + /** + * Allows to "ask" the [[EntityRef]] for a reply. + * See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern + * + * Example usage: + * {{{ + * case class Request(msg: String, replyTo: ActorRef[Reply]) + * case class Reply(msg: String) + * + * implicit val timeout = Timeout(3.seconds) + * val target: EntityRef[Request] = ... + * val f: Future[Reply] = target ? (Request("hello", _)) + * }}} + * + * Please note that an implicit [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern. + */ + def ask[U](f: ActorRef[U] ⇒ A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] + +} + +object EntityRef { + implicit final class EntityRefOps[A](val ref: EntityRef[A]) extends AnyVal { + /** + * Send a message to the Actor referenced by this ActorRef using *at-most-once* + * messaging semantics. + */ + def !(msg: A): Unit = ref.tell(msg) + + /** + * Allows to "ask" the [[EntityRef]] for a reply. + * See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern + * + * Example usage: + * {{{ + * case class Request(msg: String, replyTo: ActorRef[Reply]) + * case class Reply(msg: String) + * + * implicit val timeout = Timeout(3.seconds) + * val target: EntityRef[Request] = ... + * val f: Future[Reply] = target ? (Request("hello", _)) + * }}} + * + * Please note that an implicit [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern. + */ + def ?[U](message: ActorRef[U] ⇒ A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = + ref.ask(message)(timeout, scheduler) + } +} + diff --git a/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java index 0cb28e869a..4cf8097354 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ package jdoc.akka.cluster.sharding.typed; import akka.actor.typed.ActorRef; @@ -5,11 +8,18 @@ import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.actor.typed.Props; import akka.actor.typed.javadsl.Behaviors; -import akka.cluster.sharding.typed.ClusterSharding; -import akka.cluster.sharding.typed.*; import akka.cluster.typed.ClusterSingleton; import akka.cluster.typed.ClusterSingletonSettings; +//#import +import akka.cluster.sharding.typed.ClusterShardingSettings; +import akka.cluster.sharding.typed.ShardingEnvelope; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import akka.cluster.sharding.typed.javadsl.EntityRef; + +//#import + public class ShardingCompileOnlyTest { //#counter @@ -49,7 +59,7 @@ public class ShardingCompileOnlyTest { //#spawn EntityTypeKey typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); - ActorRef> shardRegion = sharding.spawnJavadsl( + ActorRef> shardRegion = sharding.spawn( entityId -> counter(entityId,0), Props.empty(), typeKey, diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ShardingSerializerSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ShardingSerializerSpec.scala index e08edc015d..cfb35d039b 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ShardingSerializerSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ShardingSerializerSpec.scala @@ -31,7 +31,8 @@ class ShardingSerializerSpec extends TestKit with TypedAkkaSpecWithShutdown { } "must serialize and deserialize StartEntity" in { - checkSerialization(StartEntity("abc")) + checkSerialization(scaladsl.StartEntity[Int]("abc")) + checkSerialization(javadsl.StartEntity.create(classOf[java.lang.Integer], "def")) } } } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala similarity index 91% rename from akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingPersistenceSpec.scala rename to akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index d3c67d2f1c..b0812fcc9f 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -1,9 +1,13 @@ /* * Copyright (C) 2017-2018 Lightbend Inc. */ -package akka.cluster.sharding.typed +package akka.cluster.sharding.typed.scaladsl -import akka.actor.typed.{ ActorRef, Behavior, Props, TypedAkkaSpecWithShutdown } +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.Props +import akka.actor.typed.TypedAkkaSpecWithShutdown +import akka.cluster.sharding.typed.ClusterShardingSettings import akka.cluster.typed.Cluster import akka.cluster.typed.Join import akka.persistence.typed.scaladsl.PersistentBehaviors diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala similarity index 90% rename from akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingSpec.scala rename to akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala index 1ec1c2d960..5a33db8fc0 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala @@ -2,21 +2,24 @@ * Copyright (C) 2017-2018 Lightbend Inc. */ -package akka.cluster.sharding.typed +package akka.cluster.sharding.typed.scaladsl import java.nio.charset.StandardCharsets import scala.concurrent.duration._ import akka.actor.ExtendedActorSystem -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.ActorRef import akka.actor.typed.ActorRefResolver import akka.actor.typed.ActorSystem import akka.actor.typed.Props import akka.actor.typed.TypedAkkaSpecWithShutdown +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter._ import akka.cluster.MemberStatus +import akka.cluster.sharding.typed.ClusterShardingSettings +import akka.cluster.sharding.typed.ShardingEnvelope +import akka.cluster.sharding.typed.ShardingMessageExtractor import akka.cluster.typed.Cluster import akka.cluster.typed.Join import akka.cluster.typed.Leave @@ -49,11 +52,11 @@ object ClusterShardingSpec { #allow-java-serialization = off serializers { - test = "akka.cluster.sharding.typed.ClusterShardingSpec$$Serializer" + test = "akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec$$Serializer" } serialization-bindings { - "akka.cluster.sharding.typed.ClusterShardingSpec$$TestProtocol" = test - "akka.cluster.sharding.typed.ClusterShardingSpec$$IdTestProtocol" = test + "akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec$$TestProtocol" = test + "akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec$$IdTestProtocol" = test } } """.stripMargin) @@ -133,8 +136,8 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding super.afterAll() } - val typeKey = EntityTypeKey[TestProtocol]("envelope-shard") - val behavior = Behaviors.immutable[TestProtocol] { + private val typeKey = EntityTypeKey[TestProtocol]("envelope-shard") + private val behavior = Behaviors.immutable[TestProtocol] { case (_, StopPlz()) ⇒ Behaviors.stopped @@ -148,8 +151,8 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding Behaviors.same } - val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard") - val behaviorWithId = Behaviors.immutable[IdTestProtocol] { + private val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard") + private val behaviorWithId = Behaviors.immutable[IdTestProtocol] { case (_, IdStopPlz()) ⇒ Behaviors.stopped @@ -163,7 +166,7 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding Behaviors.same } - val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn[TestProtocol]( + private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn( _ ⇒ behavior, Props.empty, typeKey, @@ -171,7 +174,7 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding 10, StopPlz()) - val shardingRef2 = sharding2.spawn[TestProtocol]( + private val shardingRef2 = sharding2.spawn( _ ⇒ behavior, Props.empty, typeKey, @@ -179,7 +182,7 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding 10, StopPlz()) - val shardingRef3: ActorRef[IdTestProtocol] = sharding.spawn3[IdTestProtocol, IdTestProtocol]( + private val shardingRef3: ActorRef[IdTestProtocol] = sharding.spawnWithMessageExtractor( _ ⇒ behaviorWithId, Props.empty, typeKey2, @@ -188,9 +191,10 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding case IdReplyPlz(id, _) ⇒ id case IdWhoAreYou(id, _) ⇒ id case other ⇒ throw new IllegalArgumentException(s"Unexpected message $other") - }) + }, + None) - val shardingRef4 = sharding2.spawn3[IdTestProtocol, IdTestProtocol]( + private val shardingRef4 = sharding2.spawnWithMessageExtractor( _ ⇒ behaviorWithId, Props.empty, typeKey2, @@ -199,11 +203,12 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding case IdReplyPlz(id, _) ⇒ id case IdWhoAreYou(id, _) ⇒ id case other ⇒ throw new IllegalArgumentException(s"Unexpected message $other") - }) + }, + None) def totalEntityCount1(): Int = { import akka.pattern.ask - implicit val timeout = Timeout(6.seconds) + implicit val timeout: Timeout = Timeout(6.seconds) val statsBefore = (shardingRef1.toUntyped ? akka.cluster.sharding.ShardRegion.GetClusterShardingStats(5.seconds)) .mapTo[akka.cluster.sharding.ShardRegion.ClusterShardingStats] val totalCount = statsBefore.futureValue.regions.values.flatMap(_.stats.values).sum diff --git a/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala index 4a1c998e79..176c0083b4 100644 --- a/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ package doc.akka.cluster.sharding.typed import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } @@ -11,7 +14,12 @@ object ShardingCompileOnlySpec { val system = ActorSystem(Behaviors.empty, "Sharding") //#sharding-extension - import akka.cluster.sharding.typed._ + import akka.cluster.sharding.typed.ClusterShardingSettings + import akka.cluster.sharding.typed.ShardingEnvelope + import akka.cluster.sharding.typed.scaladsl.ClusterSharding + import akka.cluster.sharding.typed.scaladsl.EntityTypeKey + import akka.cluster.sharding.typed.scaladsl.EntityRef + val sharding = ClusterSharding(system) //#sharding-extension @@ -33,7 +41,7 @@ object ShardingCompileOnlySpec { //#spawn val TypeKey = EntityTypeKey[CounterCommand]("Counter") // if a extractor is defined then the type would be ActorRef[BasicCommand] - val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn[CounterCommand]( + val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn( behavior = entityId ⇒ counter(entityId, 0), props = Props.empty, typeKey = TypeKey, diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 6c8482324d..801b955301 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -87,11 +87,6 @@ import akka.annotation.InternalApi * the oldest member among all cluster nodes or a group of nodes tagged with a specific * role. The oldest member can be determined by [[akka.cluster.Member#isOlderThan]]. * - * The logic that decides where a shard is to be located is defined in a pluggable shard - * allocation strategy. The default implementation [[ShardCoordinator.LeastShardAllocationStrategy]] - * allocates new shards to the `ShardRegion` with least number of previously allocated shards. - * This strategy can be replaced by an application specific implementation. - * * To be able to use newly added members in the cluster the coordinator facilitates rebalancing * of shards, i.e. migrate entities from one node to another. In the rebalance process the * coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started. @@ -116,9 +111,8 @@ import akka.annotation.InternalApi * must be to begin the rebalancing. This strategy can be replaced by an application specific * implementation. * - * The state of shard locations in the `ShardCoordinator` is persistent (durable) with - * `akka-persistence` to survive failures. Since it is running in a cluster `akka-persistence` - * must be configured with a distributed journal. When a crashed or unreachable coordinator + * The state of shard locations in the `ShardCoordinator` is stored with `akka-distributed-data` or + * `akka-persistence` to survive failures. When a crashed or unreachable coordinator * node has been removed (via down) from the cluster a new `ShardCoordinator` singleton * actor will take over and the state is recovered. During such a failure period shards * with known location are still available, while messages for new (unknown) shards diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index c660004bed..33166c647f 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -30,7 +30,7 @@ Scala : @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharding-extension } Java -: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #sharding-extension } +: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #import #sharding-extension } It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter: