separate scaladsl/javadsl and avoid overloaded spawn, #24470

* copy and amend scaladoc for ClusterSharding extension
This commit is contained in:
Patrik Nordwall 2018-02-02 07:08:34 +01:00
parent 2cd1187e7b
commit ff05671422
17 changed files with 1060 additions and 718 deletions

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor.typed package akka.actor.typed
package javadsl package javadsl
@ -5,12 +8,26 @@ import java.util.concurrent.CompletionStage
import akka.actor.Scheduler import akka.actor.Scheduler
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
import akka.japi.function.Function import akka.japi.function.{ Function JFunction }
import akka.util.Timeout import akka.util.Timeout
import scala.compat.java8.FutureConverters import scala.compat.java8.FutureConverters._
/**
* The ask-pattern implements the initiator side of a requestreply protocol.
*
* Note that if you are inside of an actor you should prefer [[ActorContext.ask]]
* as that provides better safety.
*
* The party that asks may be within or without an Actor, since the
* implementation will fabricate a (hidden) [[ActorRef]] that is bound to a
* `CompletableFuture`. This ActorRef will need to be injected in the
* message that is sent to the target Actor in order to function as a reply-to
* address, therefore the argument to the ask method is not the message itself
* but a function that given the reply-to address will create the message.
*
*/
object AskPattern { object AskPattern {
def ask[T, U](actor: ActorRef[T], message: Function[ActorRef[U], T], timeout: Timeout, scheduler: Scheduler): CompletionStage[U] = def ask[T, U](actor: ActorRef[T], message: JFunction[ActorRef[U], T], timeout: Timeout, scheduler: Scheduler): CompletionStage[U] =
FutureConverters.toJava[U](actor.?(message.apply)(timeout, scheduler)) (actor.?(message.apply)(timeout, scheduler)).toJava
} }

View file

@ -26,17 +26,17 @@ public final class ShardingMessages {
akka.protobuf.ByteString akka.protobuf.ByteString
getEntityIdBytes(); getEntityIdBytes();
// optional .Payload message = 2; // required .Payload message = 2;
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
boolean hasMessage(); boolean hasMessage();
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
akka.remote.ContainerFormats.Payload getMessage(); akka.remote.ContainerFormats.Payload getMessage();
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder(); akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder();
} }
@ -192,23 +192,23 @@ public final class ShardingMessages {
} }
} }
// optional .Payload message = 2; // required .Payload message = 2;
public static final int MESSAGE_FIELD_NUMBER = 2; public static final int MESSAGE_FIELD_NUMBER = 2;
private akka.remote.ContainerFormats.Payload message_; private akka.remote.ContainerFormats.Payload message_;
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
public boolean hasMessage() { public boolean hasMessage() {
return ((bitField0_ & 0x00000002) == 0x00000002); return ((bitField0_ & 0x00000002) == 0x00000002);
} }
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
public akka.remote.ContainerFormats.Payload getMessage() { public akka.remote.ContainerFormats.Payload getMessage() {
return message_; return message_;
} }
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() { public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() {
return message_; return message_;
@ -227,11 +227,13 @@ public final class ShardingMessages {
memoizedIsInitialized = 0; memoizedIsInitialized = 0;
return false; return false;
} }
if (hasMessage()) { if (!hasMessage()) {
if (!getMessage().isInitialized()) { memoizedIsInitialized = 0;
memoizedIsInitialized = 0; return false;
return false; }
} if (!getMessage().isInitialized()) {
memoizedIsInitialized = 0;
return false;
} }
memoizedIsInitialized = 1; memoizedIsInitialized = 1;
return true; return true;
@ -461,11 +463,13 @@ public final class ShardingMessages {
return false; return false;
} }
if (hasMessage()) { if (!hasMessage()) {
if (!getMessage().isInitialized()) {
return false;
return false; }
} if (!getMessage().isInitialized()) {
return false;
} }
return true; return true;
} }
@ -563,18 +567,18 @@ public final class ShardingMessages {
return this; return this;
} }
// optional .Payload message = 2; // required .Payload message = 2;
private akka.remote.ContainerFormats.Payload message_ = akka.remote.ContainerFormats.Payload.getDefaultInstance(); private akka.remote.ContainerFormats.Payload message_ = akka.remote.ContainerFormats.Payload.getDefaultInstance();
private akka.protobuf.SingleFieldBuilder< private akka.protobuf.SingleFieldBuilder<
akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder> messageBuilder_; akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder> messageBuilder_;
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
public boolean hasMessage() { public boolean hasMessage() {
return ((bitField0_ & 0x00000002) == 0x00000002); return ((bitField0_ & 0x00000002) == 0x00000002);
} }
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
public akka.remote.ContainerFormats.Payload getMessage() { public akka.remote.ContainerFormats.Payload getMessage() {
if (messageBuilder_ == null) { if (messageBuilder_ == null) {
@ -584,7 +588,7 @@ public final class ShardingMessages {
} }
} }
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
public Builder setMessage(akka.remote.ContainerFormats.Payload value) { public Builder setMessage(akka.remote.ContainerFormats.Payload value) {
if (messageBuilder_ == null) { if (messageBuilder_ == null) {
@ -600,7 +604,7 @@ public final class ShardingMessages {
return this; return this;
} }
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
public Builder setMessage( public Builder setMessage(
akka.remote.ContainerFormats.Payload.Builder builderForValue) { akka.remote.ContainerFormats.Payload.Builder builderForValue) {
@ -614,7 +618,7 @@ public final class ShardingMessages {
return this; return this;
} }
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
public Builder mergeMessage(akka.remote.ContainerFormats.Payload value) { public Builder mergeMessage(akka.remote.ContainerFormats.Payload value) {
if (messageBuilder_ == null) { if (messageBuilder_ == null) {
@ -633,7 +637,7 @@ public final class ShardingMessages {
return this; return this;
} }
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
public Builder clearMessage() { public Builder clearMessage() {
if (messageBuilder_ == null) { if (messageBuilder_ == null) {
@ -646,7 +650,7 @@ public final class ShardingMessages {
return this; return this;
} }
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
public akka.remote.ContainerFormats.Payload.Builder getMessageBuilder() { public akka.remote.ContainerFormats.Payload.Builder getMessageBuilder() {
bitField0_ |= 0x00000002; bitField0_ |= 0x00000002;
@ -654,7 +658,7 @@ public final class ShardingMessages {
return getMessageFieldBuilder().getBuilder(); return getMessageFieldBuilder().getBuilder();
} }
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() { public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() {
if (messageBuilder_ != null) { if (messageBuilder_ != null) {
@ -664,7 +668,7 @@ public final class ShardingMessages {
} }
} }
/** /**
* <code>optional .Payload message = 2;</code> * <code>required .Payload message = 2;</code>
*/ */
private akka.protobuf.SingleFieldBuilder< private akka.protobuf.SingleFieldBuilder<
akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder> akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder>
@ -708,7 +712,7 @@ public final class ShardingMessages {
"\n\026ShardingMessages.proto\022\033akka.cluster.s" + "\n\026ShardingMessages.proto\022\033akka.cluster.s" +
"harding.typed\032\026ContainerFormats.proto\"?\n" + "harding.typed\032\026ContainerFormats.proto\"?\n" +
"\020ShardingEnvelope\022\020\n\010entityId\030\001 \002(\t\022\031\n\007m" + "\020ShardingEnvelope\022\020\n\010entityId\030\001 \002(\t\022\031\n\007m" +
"essage\030\002 \001(\0132\010.PayloadB1\n-akka.cluster.s" + "essage\030\002 \002(\0132\010.PayloadB1\n-akka.cluster.s" +
"harding.typed.internal.protobufH\001" "harding.typed.internal.protobufH\001"
}; };
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =

View file

@ -9,5 +9,5 @@ import "ContainerFormats.proto";
message ShardingEnvelope { message ShardingEnvelope {
required string entityId = 1; required string entityId = 1;
optional Payload message = 2; required Payload message = 2;
} }

View file

@ -1,510 +0,0 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.cluster.sharding.typed
import scala.reflect.ClassTag
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.UntypedBehavior
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.actor.typed.Props
import akka.actor.typed.internal.adapter.ActorRefAdapter
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity }
import akka.cluster.typed.Cluster
import akka.event.Logging
import akka.event.LoggingAdapter
/**
* Default envelope type that may be used with Cluster Sharding.
*
* Cluster Sharding provides a default [[HashCodeMessageExtractor]] that is able to handle
* these types of messages, by hashing the entityId into into the shardId. It is not the only,
* but a convenient way to send envelope-wrapped messages via cluster sharding.
*
* The alternative way of routing messages through sharding is to not use envelopes,
* and have the message types themselves carry identifiers.
*/
final case class ShardingEnvelope[A](entityId: String, message: A) // TODO think if should remain a case class
/** Allows starting a specific Sharded Entity by its entity identifier */
object StartEntity {
/**
* Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the
* specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it.
*/
def apply[A](entityId: String): ShardingEnvelope[A] = {
// StartEntity isn't really of type A, but erased and StartEntity is only handled internally, not delivered to the entity
new ShardingEnvelope[A](entityId, UntypedStartEntity(entityId).asInstanceOf[A])
}
/**
* Java API
*
* Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the
* specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it.
*/
def create[A](msgClass: Class[A], entityId: String): ShardingEnvelope[A] =
apply[A](entityId)
}
object ShardingMessageExtractor {
/**
* Scala API:
*
* Create the default message extractor, using envelopes to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*/
def apply[A](maxNumberOfShards: Int, handOffStopMessage: A): ShardingMessageExtractor[ShardingEnvelope[A], A] =
new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage)
/**
* Scala API: Create a message extractor for a protocol where the entity id is available in each message.
*/
def noEnvelope[A](
maxNumberOfShards: Int,
handOffStopMessage: A)(
extractEntityId: A String): ShardingMessageExtractor[A, A] =
new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) {
def entityId(message: A) = extractEntityId(message)
}
}
/**
* Entirely customizable typed message extractor. Prefer [[HashCodeMessageExtractor]] or
* [[HashCodeNoEnvelopeMessageExtractor]]if possible.
*
* @tparam E Possibly an Envelope around the messages accepted by the entity actor, is the same as `A` if there is no
* envelope.
* @tparam A The type of message accepted by the entity actor
*/
abstract class ShardingMessageExtractor[E, A] {
/**
* Extract the entity id from an incoming `message`. If `null` is returned
* the message will be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
*/
def entityId(message: E): String
/**
* The shard identifier for a given entity id. Only messages that passed the [[ShardingMessageExtractor#entityId]]
* function will be used as input to this function.
*/
def shardId(entityId: String): String
/**
* Extract the message to send to the entity from an incoming `message`.
* Note that the extracted message does not have to be the same as the incoming
* message to support wrapping in message envelope that is unwrapped before
* sending to the entity actor.
*/
def unwrapMessage(message: E): A
/**
* Message sent to an entity to tell it to stop, e.g. when rebalanced.
* The message defined here is not passed to `entityId`, `shardId` or `unwrapMessage`.
*/
def handOffStopMessage: A
}
/**
* Default message extractor type, using envelopes to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*
* @tparam A The type of message accepted by the entity actor
*/
final class HashCodeMessageExtractor[A](
val maxNumberOfShards: Int,
override val handOffStopMessage: A)
extends ShardingMessageExtractor[ShardingEnvelope[A], A] {
override def entityId(envelope: ShardingEnvelope[A]): String = envelope.entityId
override def shardId(entityId: String): String = (math.abs(entityId.hashCode) % maxNumberOfShards).toString
override def unwrapMessage(envelope: ShardingEnvelope[A]): A = envelope.message
}
/**
* Default message extractor type, using a property of the message to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*
* @tparam A The type of message accepted by the entity actor
*/
abstract class HashCodeNoEnvelopeMessageExtractor[A](
val maxNumberOfShards: Int,
override val handOffStopMessage: A)
extends ShardingMessageExtractor[A, A] {
override def shardId(entityId: String): String = (math.abs(entityId.hashCode) % maxNumberOfShards).toString
override final def unwrapMessage(message: A): A = message
override def toString = s"HashCodeNoEnvelopeMessageExtractor($maxNumberOfShards)"
}
/**
* INTERNAL API
* Extracts entityId and unwraps ShardingEnvelope and StartEntity messages.
* Other messages are delegated to the given `ShardingMessageExtractor`.
*/
@InternalApi private[akka] class ExtractorAdapter[E, A](delegate: ShardingMessageExtractor[E, A])
extends ShardingMessageExtractor[Any, A] {
override def entityId(message: Any): String = {
message match {
case ShardingEnvelope(entityId, _) entityId //also covers UntypedStartEntity in ShardingEnvelope
case UntypedStartEntity(entityId) entityId
case msg: E @unchecked delegate.entityId(msg)
}
}
override def shardId(entityId: String): String = delegate.shardId(entityId)
override def unwrapMessage(message: Any): A = {
message match {
case ShardingEnvelope(_, msg: A @unchecked)
//also covers UntypedStartEntity in ShardingEnvelope
msg
case msg: UntypedStartEntity
// not really of type A, but erased and StartEntity is only handled internally, not delivered to the entity
msg.asInstanceOf[A]
case msg: E @unchecked
delegate.unwrapMessage(msg)
}
}
override def handOffStopMessage: A = delegate.handOffStopMessage
override def toString: String = delegate.toString
}
/**
* The key of an entity type, the `name` must be unique.
*
* Not for user extension.
*/
@DoNotInherit abstract class EntityTypeKey[T] {
def name: String
}
object EntityTypeKey {
/**
* Scala API: Creates an `EntityTypeKey`. The `name` must be unique.
*/
def apply[T](name: String)(implicit tTag: ClassTag[T]): EntityTypeKey[T] =
AdaptedClusterShardingImpl.EntityTypeKeyImpl(name, implicitly[ClassTag[T]].runtimeClass.getName)
/**
* Java API: Creates an `EntityTypeKey`. The `name` must be unique.
*/
def create[T](messageClass: Class[T], name: String): EntityTypeKey[T] =
AdaptedClusterShardingImpl.EntityTypeKeyImpl(name, messageClass.getName)
}
object ClusterSharding extends ExtensionId[ClusterSharding] {
override def createExtension(system: ActorSystem[_]): ClusterSharding =
new AdaptedClusterShardingImpl(system)
/** Java API */
def get(system: ActorSystem[_]): ClusterSharding = apply(system)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object AdaptedClusterShardingImpl {
final case class EntityTypeKeyImpl[T](name: String, messageClassName: String) extends EntityTypeKey[T] {
override def toString: String = s"EntityTypeKey[$messageClassName]($name)"
}
}
/** INTERNAL API */
@InternalApi
final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSharding {
import akka.actor.typed.scaladsl.adapter._
require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted untyped actor systems can be used for cluster features")
private val cluster = Cluster(system)
private val untypedSystem = system.toUntyped
private val untypedSharding = akka.cluster.sharding.ClusterSharding(untypedSystem)
private val log: LoggingAdapter = Logging(untypedSystem, classOf[ClusterSharding])
override def spawn[A](
behavior: String Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = {
val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage)
spawn2(behavior, entityProps, typeKey, settings, extractor, defaultShardAllocationStrategy(settings))
}
override def spawnJavadsl[A](
behavior: EntityIdToBehavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = {
val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage)
spawnJavadsl(behavior, entityProps, typeKey, settings, extractor, defaultShardAllocationStrategy(settings))
}
override def spawn3[E, A](
behavior: String Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E] =
spawn2(behavior, entityProps, typeKey, settings, messageExtractor, defaultShardAllocationStrategy(settings))
override def spawnJavadsl[E, A](
behavior: EntityIdToBehavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E] =
spawnJavadsl(behavior, entityProps, typeKey, settings, messageExtractor, defaultShardAllocationStrategy(settings))
override def spawn2[E, A](
behavior: String Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
extractor: ShardingMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy): ActorRef[E] = {
val untypedSettings = ClusterShardingSettings.toUntypedSettings(settings)
val extractorAdapter = new ExtractorAdapter(extractor)
val extractEntityId: ShardRegion.ExtractEntityId = {
// TODO is it possible to avoid the double evaluation of entityId
case message if extractorAdapter.entityId(message) != null
(extractorAdapter.entityId(message), extractorAdapter.unwrapMessage(message))
}
val extractShardId: ShardRegion.ExtractShardId = { message
extractorAdapter.entityId(message) match {
case null null
case eid extractorAdapter.shardId(eid)
}
}
val ref =
if (settings.shouldHostShard(cluster)) {
log.info("Starting Shard Region [{}]...", typeKey.name)
val untypedEntityPropsFactory: String akka.actor.Props = { entityId
behavior(entityId) match {
case u: UntypedBehavior[_] u.untypedProps // PersistentBehavior
case b PropsAdapter(b, entityProps)
}
}
untypedSharding.internalStart(
typeKey.name,
untypedEntityPropsFactory,
untypedSettings,
extractEntityId,
extractShardId,
defaultShardAllocationStrategy(settings),
extractor.handOffStopMessage)
} else {
system.log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...")
untypedSharding.startProxy(
typeKey.name,
settings.role,
dataCenter = None, // TODO what about the multi-dc value here? issue #23689
extractEntityId,
extractShardId)
}
ActorRefAdapter(ref)
}
override def spawnJavadsl[E, A](
behavior: EntityIdToBehavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
extractor: ShardingMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy): ActorRef[E] = {
spawn2(entityId behavior.apply(entityId), entityProps, typeKey, settings, extractor, allocationStrategy)
}
override def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A] = {
new AdaptedEntityRefImpl[A](untypedSharding.shardRegion(typeKey.name), entityId)
}
override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = {
val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold
val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance
new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance)
}
}
@FunctionalInterface
trait EntityIdToBehavior[A] {
def apply(entityId: String): Behavior[A]
}
@DoNotInherit
sealed abstract class ClusterSharding extends Extension {
/**
* Scala API: Spawn a shard region or a proxy depending on if the settings require role and if this node has
* such a role.
*
* Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the
* recipient actor.
* A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId
* [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced.
* @tparam A The type of command the entity accepts
*/
def spawn[A](
behavior: String Behavior[A],
props: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]]
/**
* Java API: Spawn a shard region or a proxy depending on if the settings require role and if this node has
* such a role.
*
* Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the
* recipient actor.
* A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId
* [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced.
* @tparam A The type of command the entity accepts
*/
def spawnJavadsl[A]( // FIXME javadsl package
behavior: EntityIdToBehavior[A],
props: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]]
/**
* Scala API: Spawn a shard region or a proxy depending on if the settings require role and if this node
* has such a role.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
* @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawn2[E, A](
behavior: String Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy): ActorRef[E]
/**
* Java API: Spawn a shard region or a proxy depending on if the settings require role and if this node
* has such a role.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
* @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawnJavadsl[E, A]( // FIXME javadsl package
behavior: EntityIdToBehavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy): ActorRef[E]
/**
* Scala API: Spawn a shard region or a proxy depending on if the settings require role and if this node
* has such a role.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawn3[E, A](
behavior: String Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E]
/**
* Java API: Spawn a shard region or a proxy depending on if the settings require role and if this node
* has such a role.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawnJavadsl[E, A]( // FIXME javadsl package
behavior: EntityIdToBehavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E]
/**
* Create an `ActorRef`-like reference to a specific sharded entity.
* Currently you have to correctly specify the type of messages the target can handle.
*
* Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the
* here provided `entityId`.
*
* FIXME a more typed version of this API will be explored in https://github.com/akka/akka/issues/23690
*
* For in-depth documentation of its semantics, see [[EntityRef]].
*/
def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A]
/** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy
}

View file

@ -1,127 +0,0 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.cluster.sharding.typed
import akka.actor.{ InternalActorRef, Scheduler }
import akka.annotation.InternalApi
import akka.pattern.{ AskTimeoutException, PromiseActorRef }
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.AskPattern
import akka.actor.typed.scaladsl.AskPattern.PromiseRef
import akka.util.Timeout
import scala.concurrent.Future
/**
* A reference to an sharded Entity, which allows `ActorRef`-like usage.
*
* An [[EntityRef]] is NOT an [[ActorRef]]by designin order to be explicit about the fact that the life-cycle
* of a sharded Entity is very different than a plain Actors. Most notably, this is shown by features of Entities
* such as re-balancing (an active Entity to a different node) or passivation. Both of which are aimed to be completely
* transparent to users of such Entity. In other words, if this were to be a plain ActorRef, it would be possible to
* apply DeathWatch to it, which in turn would then trigger when the sharded Actor stopped, breaking the illusion that
* Entity refs are "always there". Please note that while not encouraged, it is possible to expose an Actor's `self`
* [[ActorRef]] and watch it in case such notification is desired.
*/
trait EntityRef[A] {
/**
* Send a message to the entity referenced by this EntityRef using *at-most-once*
* messaging semantics.
*/
def tell(msg: A): Unit
/**
* Allows to "ask" the [[EntityRef]] for a reply.
* See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern
*
* Example usage:
* {{{
* case class Request(msg: String, replyTo: ActorRef[Reply])
* case class Reply(msg: String)
*
* implicit val timeout = Timeout(3.seconds)
* val target: EntityRef[Request] = ...
* val f: Future[Reply] = target ? (Request("hello", _))
* }}}
*
* Please note that an implicit [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern.
*/
def ask[U](f: ActorRef[U] A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U]
}
object EntityRef {
implicit final class EntityRefOps[A](val ref: EntityRef[A]) extends AnyVal {
/**
* Send a message to the Actor referenced by this ActorRef using *at-most-once*
* messaging semantics.
*/
def !(msg: A): Unit = ref.tell(msg)
/**
* Allows to "ask" the [[EntityRef]] for a reply.
* See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern
*
* Example usage:
* {{{
* case class Request(msg: String, replyTo: ActorRef[Reply])
* case class Reply(msg: String)
*
* implicit val timeout = Timeout(3.seconds)
* val target: EntityRef[Request] = ...
* val f: Future[Reply] = target ? (Request("hello", _))
* }}}
*
* Please note that an implicit [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern.
*/
def ?[U](f: ActorRef[U] A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] =
ref.ask(f)(timeout, scheduler)
}
}
@InternalApi
private[akka] final class AdaptedEntityRefImpl[A](shardRegion: akka.actor.ActorRef, entityId: String) extends EntityRef[A] {
override def tell(msg: A): Unit =
shardRegion ! ShardingEnvelope(entityId, msg)
override def ask[U](f: (ActorRef[U]) A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = {
val p = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout)
val m = f(p.ref)
if (p.promiseRef ne null) p.promiseRef.messageClassName = m.getClass.getName
shardRegion ! ShardingEnvelope(entityId, m)
p.future
}
/** Similar to [[akka.actor.typed.scaladsl.AskPattern.PromiseRef]] but for an [[EntityRef]] target. */
@InternalApi
private final class EntityPromiseRef[U](untyped: InternalActorRef, timeout: Timeout) {
import akka.actor.typed.internal.{ adapter adapt }
// Note: _promiseRef mustn't have a type pattern, since it can be null
private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) =
if (untyped.isTerminated)
(
adapt.ActorRefAdapter[U](untyped.provider.deadLetters),
Future.failed[U](new AskTimeoutException(s"Recipient[$untyped] had already been terminated.")),
null)
else if (timeout.duration.length <= 0)
(
adapt.ActorRefAdapter[U](untyped.provider.deadLetters),
Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$untyped]")),
null
)
else {
val a = PromiseActorRef(untyped.provider, timeout, untyped, "unknown")
val b = adapt.ActorRefAdapter[U](a)
(b, a.result.future.asInstanceOf[Future[U]], a)
}
val ref: ActorRef[U] = _ref
val future: Future[U] = _future
val promiseRef: PromiseActorRef = _promiseRef
}
}

View file

@ -0,0 +1,117 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.cluster.sharding.typed
object ShardingMessageExtractor {
/**
* Scala API:
*
* Create the default message extractor, using envelopes to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*/
def apply[A](maxNumberOfShards: Int, handOffStopMessage: A): ShardingMessageExtractor[ShardingEnvelope[A], A] =
new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage)
/**
* Scala API: Create a message extractor for a protocol where the entity id is available in each message.
*/
def noEnvelope[A](
maxNumberOfShards: Int,
handOffStopMessage: A)(
extractEntityId: A String): ShardingMessageExtractor[A, A] =
new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) {
def entityId(message: A) = extractEntityId(message)
}
}
/**
* Entirely customizable typed message extractor. Prefer [[HashCodeMessageExtractor]] or
* [[HashCodeNoEnvelopeMessageExtractor]]if possible.
*
* @tparam E Possibly an Envelope around the messages accepted by the entity actor, is the same as `A` if there is no
* envelope.
* @tparam A The type of message accepted by the entity actor
*/
abstract class ShardingMessageExtractor[E, A] {
/**
* Extract the entity id from an incoming `message`. If `null` is returned
* the message will be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
*/
def entityId(message: E): String
/**
* The shard identifier for a given entity id. Only messages that passed the [[ShardingMessageExtractor#entityId]]
* function will be used as input to this function.
*/
def shardId(entityId: String): String
/**
* Extract the message to send to the entity from an incoming `message`.
* Note that the extracted message does not have to be the same as the incoming
* message to support wrapping in message envelope that is unwrapped before
* sending to the entity actor.
*/
def unwrapMessage(message: E): A
/**
* Message sent to an entity to tell it to stop, e.g. when rebalanced.
* The message defined here is not passed to `entityId`, `shardId` or `unwrapMessage`.
*/
def handOffStopMessage: A
}
/**
* Default message extractor type, using envelopes to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*
* @tparam A The type of message accepted by the entity actor
*/
final class HashCodeMessageExtractor[A](
val maxNumberOfShards: Int,
override val handOffStopMessage: A)
extends ShardingMessageExtractor[ShardingEnvelope[A], A] {
override def entityId(envelope: ShardingEnvelope[A]): String = envelope.entityId
override def shardId(entityId: String): String = (math.abs(entityId.hashCode) % maxNumberOfShards).toString
override def unwrapMessage(envelope: ShardingEnvelope[A]): A = envelope.message
}
/**
* Default message extractor type, using a property of the message to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*
* @tparam A The type of message accepted by the entity actor
*/
abstract class HashCodeNoEnvelopeMessageExtractor[A](
val maxNumberOfShards: Int,
override val handOffStopMessage: A)
extends ShardingMessageExtractor[A, A] {
override def shardId(entityId: String): String = (math.abs(entityId.hashCode) % maxNumberOfShards).toString
override final def unwrapMessage(message: A): A = message
override def toString = s"HashCodeNoEnvelopeMessageExtractor($maxNumberOfShards)"
}
/**
* Default envelope type that may be used with Cluster Sharding.
*
* Cluster Sharding provides a default [[HashCodeMessageExtractor]] that is able to handle
* these types of messages, by hashing the entityId into into the shardId. It is not the only,
* but a convenient way to send envelope-wrapped messages via cluster sharding.
*
* The alternative way of routing messages through sharding is to not use envelopes,
* and have the message types themselves carry identifiers.
*/
final case class ShardingEnvelope[A](entityId: String, message: A) // TODO think if should remain a case class

View file

@ -0,0 +1,248 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.cluster.sharding.typed
package internal
import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._
import scala.concurrent.Future
import akka.actor.InternalActorRef
import akka.actor.Scheduler
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.UntypedBehavior
import akka.actor.typed.Props
import akka.actor.typed.internal.adapter.ActorRefAdapter
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity }
import akka.cluster.sharding.typed.javadsl.EntityIdToBehavior
import akka.cluster.typed.Cluster
import akka.event.Logging
import akka.event.LoggingAdapter
import akka.pattern.AskTimeoutException
import akka.pattern.PromiseActorRef
import akka.util.Timeout
import akka.japi.function.{ Function JFunction }
/**
* INTERNAL API
* Extracts entityId and unwraps ShardingEnvelope and StartEntity messages.
* Other messages are delegated to the given `ShardingMessageExtractor`.
*/
@InternalApi private[akka] class ExtractorAdapter[E, A](delegate: ShardingMessageExtractor[E, A])
extends ShardingMessageExtractor[Any, A] {
override def entityId(message: Any): String = {
message match {
case ShardingEnvelope(entityId, _) entityId //also covers UntypedStartEntity in ShardingEnvelope
case UntypedStartEntity(entityId) entityId
case msg: E @unchecked delegate.entityId(msg)
}
}
override def shardId(entityId: String): String = delegate.shardId(entityId)
override def unwrapMessage(message: Any): A = {
message match {
case ShardingEnvelope(_, msg: A @unchecked)
//also covers UntypedStartEntity in ShardingEnvelope
msg
case msg: UntypedStartEntity
// not really of type A, but erased and StartEntity is only handled internally, not delivered to the entity
msg.asInstanceOf[A]
case msg: E @unchecked
delegate.unwrapMessage(msg)
}
}
override def handOffStopMessage: A = delegate.handOffStopMessage
override def toString: String = delegate.toString
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class EntityTypeKeyImpl[T](name: String, messageClassName: String)
extends javadsl.EntityTypeKey[T] with scaladsl.EntityTypeKey[T] {
override def toString: String = s"EntityTypeKey[$messageClassName]($name)"
}
/** INTERNAL API */
@InternalApi private[akka] final class ClusterShardingImpl(system: ActorSystem[_])
extends javadsl.ClusterSharding with scaladsl.ClusterSharding {
import akka.actor.typed.scaladsl.adapter._
require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted untyped actor systems can be used for cluster features")
private val cluster = Cluster(system)
private val untypedSystem = system.toUntyped
private val untypedSharding = akka.cluster.sharding.ClusterSharding(untypedSystem)
private val log: LoggingAdapter = Logging(untypedSystem, classOf[scaladsl.ClusterSharding])
override def spawn[A](
behavior: String Behavior[A],
entityProps: Props,
typeKey: scaladsl.EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = {
val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage)
spawnWithMessageExtractor(behavior, entityProps, typeKey, settings, extractor, Some(defaultShardAllocationStrategy(settings)))
}
override def spawn[A](
behavior: EntityIdToBehavior[A],
entityProps: Props,
typeKey: javadsl.EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = {
val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage)
spawnWithMessageExtractor(behavior, entityProps, typeKey, settings, extractor,
Optional.of(defaultShardAllocationStrategy(settings)))
}
override def spawnWithMessageExtractor[E, A](
behavior: String Behavior[A],
entityProps: Props,
typeKey: scaladsl.EntityTypeKey[A],
settings: ClusterShardingSettings,
extractor: ShardingMessageExtractor[E, A],
allocationStrategy: Option[ShardAllocationStrategy]): ActorRef[E] = {
val untypedSettings = ClusterShardingSettings.toUntypedSettings(settings)
val extractorAdapter = new ExtractorAdapter(extractor)
val extractEntityId: ShardRegion.ExtractEntityId = {
// TODO is it possible to avoid the double evaluation of entityId
case message if extractorAdapter.entityId(message) != null
(extractorAdapter.entityId(message), extractorAdapter.unwrapMessage(message))
}
val extractShardId: ShardRegion.ExtractShardId = { message
extractorAdapter.entityId(message) match {
case null null
case eid extractorAdapter.shardId(eid)
}
}
val ref =
if (settings.shouldHostShard(cluster)) {
log.info("Starting Shard Region [{}]...", typeKey.name)
val untypedEntityPropsFactory: String akka.actor.Props = { entityId
behavior(entityId) match {
case u: UntypedBehavior[_] u.untypedProps // PersistentBehavior
case b PropsAdapter(b, entityProps)
}
}
untypedSharding.internalStart(
typeKey.name,
untypedEntityPropsFactory,
untypedSettings,
extractEntityId,
extractShardId,
defaultShardAllocationStrategy(settings),
extractor.handOffStopMessage)
} else {
system.log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...")
untypedSharding.startProxy(
typeKey.name,
settings.role,
dataCenter = None, // TODO what about the multi-dc value here? issue #23689
extractEntityId,
extractShardId)
}
ActorRefAdapter(ref)
}
override def spawnWithMessageExtractor[E, A](
behavior: EntityIdToBehavior[A],
entityProps: Props,
typeKey: javadsl.EntityTypeKey[A],
settings: ClusterShardingSettings,
extractor: ShardingMessageExtractor[E, A],
allocationStrategy: Optional[ShardAllocationStrategy]): ActorRef[E] = {
spawnWithMessageExtractor(entityId behavior.apply(entityId), entityProps, typeKey.asScala,
settings, extractor, allocationStrategy.asScala)
}
override def entityRefFor[A](typeKey: scaladsl.EntityTypeKey[A], entityId: String): scaladsl.EntityRef[A] = {
new EntityRefImpl[A](untypedSharding.shardRegion(typeKey.name), entityId)
}
override def entityRefFor[A](typeKey: javadsl.EntityTypeKey[A], entityId: String): javadsl.EntityRef[A] = {
new EntityRefImpl[A](untypedSharding.shardRegion(typeKey.name), entityId)
}
override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = {
val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold
val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance
new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class EntityRefImpl[A](shardRegion: akka.actor.ActorRef, entityId: String)
extends javadsl.EntityRef[A] with scaladsl.EntityRef[A] {
override def tell(msg: A): Unit =
shardRegion ! ShardingEnvelope(entityId, msg)
override def ask[U](message: (ActorRef[U]) A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = {
val replyTo = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout)
val m = message(replyTo.ref)
if (replyTo.promiseRef ne null) replyTo.promiseRef.messageClassName = m.getClass.getName
shardRegion ! ShardingEnvelope(entityId, m)
replyTo.future
}
def ask[U](message: JFunction[ActorRef[U], A], timeout: Timeout, scheduler: Scheduler): CompletionStage[U] =
ask[U](replyTo message.apply(replyTo))(timeout, scheduler).toJava
/** Similar to [[akka.actor.typed.scaladsl.AskPattern.PromiseRef]] but for an `EntityRef` target. */
@InternalApi
private final class EntityPromiseRef[U](untyped: InternalActorRef, timeout: Timeout) {
import akka.actor.typed.internal.{ adapter adapt }
// Note: _promiseRef mustn't have a type pattern, since it can be null
private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) =
if (untyped.isTerminated)
(
adapt.ActorRefAdapter[U](untyped.provider.deadLetters),
Future.failed[U](new AskTimeoutException(s"Recipient[$untyped] had already been terminated.")),
null)
else if (timeout.duration.length <= 0)
(
adapt.ActorRefAdapter[U](untyped.provider.deadLetters),
Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$untyped]")),
null
)
else {
val a = PromiseActorRef(untyped.provider, timeout, untyped, "unknown")
val b = adapt.ActorRefAdapter[U](a)
(b, a.result.future.asInstanceOf[Future[U]], a)
}
val ref: ActorRef[U] = _ref
val future: Future[U] = _future
val promiseRef: PromiseActorRef = _promiseRef
}
}

View file

@ -5,11 +5,12 @@ package akka.cluster.sharding.typed.internal
import java.io.NotSerializableException import java.io.NotSerializableException
import akka.cluster.sharding.typed.internal.protobuf.ShardingMessages
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.internal.protobuf.ShardingMessages
import akka.remote.serialization.WrappedPayloadSupport import akka.remote.serialization.WrappedPayloadSupport
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest } import akka.serialization.BaseSerializer
import akka.cluster.sharding.typed.{ ShardingEnvelope, StartEntity } import akka.serialization.SerializerWithStringManifest
/** /**
* INTERNAL API * INTERNAL API
@ -31,9 +32,7 @@ import akka.cluster.sharding.typed.{ ShardingEnvelope, StartEntity }
case env: ShardingEnvelope[_] case env: ShardingEnvelope[_]
val builder = ShardingMessages.ShardingEnvelope.newBuilder() val builder = ShardingMessages.ShardingEnvelope.newBuilder()
builder.setEntityId(env.entityId) builder.setEntityId(env.entityId)
// special null for StartEntity, might change with issue #23679 builder.setMessage(payloadSupport.payloadBuilder(env.message))
if (env.message != null)
builder.setMessage(payloadSupport.payloadBuilder(env.message))
builder.build().toByteArray() builder.build().toByteArray()
case _ case _
@ -44,13 +43,8 @@ import akka.cluster.sharding.typed.{ ShardingEnvelope, StartEntity }
case ShardingEnvelopeManifest case ShardingEnvelopeManifest
val env = ShardingMessages.ShardingEnvelope.parseFrom(bytes) val env = ShardingMessages.ShardingEnvelope.parseFrom(bytes)
val entityId = env.getEntityId val entityId = env.getEntityId
if (env.hasMessage) { val wrappedMsg = payloadSupport.deserializePayload(env.getMessage)
val wrappedMsg = payloadSupport.deserializePayload(env.getMessage) ShardingEnvelope(entityId, wrappedMsg)
ShardingEnvelope(entityId, wrappedMsg)
} else {
// special for StartEntity, might change with issue #23679
StartEntity(entityId)
}
case _ case _
throw new NotSerializableException( throw new NotSerializableException(
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")

View file

@ -0,0 +1,270 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.cluster.sharding.typed
package javadsl
import java.util.Optional
import java.util.concurrent.CompletionStage
import akka.actor.Scheduler
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.japi.function.{ Function JFunction }
import akka.util.Timeout
@FunctionalInterface
trait EntityIdToBehavior[A] {
def apply(entityId: String): Behavior[A]
}
object ClusterSharding {
def get(system: ActorSystem[_]): ClusterSharding =
scaladsl.ClusterSharding(system).asJava
}
/**
* This extension provides sharding functionality of actors in a cluster.
* The typical use case is when you have many stateful actors that together consume
* more resources (e.g. memory) than fit on one machine. You need to distribute them across
* several nodes in the cluster and you want to be able to interact with them using their
* logical identifier, but without having to care about their physical location in the cluster,
* which might also change over time. It could for example be actors representing Aggregate Roots in
* Domain-Driven Design terminology. Here we call these actors "entities". These actors
* typically have persistent (durable) state, but this feature is not limited to
* actors with persistent state.
*
* In this context sharding means that actors with an identifier, so called entities,
* can be automatically distributed across multiple nodes in the cluster. Each entity
* actor runs only at one place, and messages can be sent to the entity without requiring
* the sender to know the location of the destination actor. This is achieved by sending
* the messages via a `ShardRegion` actor provided by this extension, which knows how
* to route the message with the entity id to the final destination.
*
* This extension is supposed to be used by first, typically at system startup on each node
* in the cluster, registering the supported entity types with the [[ClusterSharding#spawn]]
* method, which returns the `ShardRegion` actor reference for a named entity type.
* Messages to the entities are always sent via that `ActorRef`, i.e. the local `ShardRegion`.
* Messages can also be sent via the [[EntityRef]] retrieved with [[ClusterSharding#entityRefFor]],
* which will also send via the local `ShardRegion`.
*
* Some settings can be configured as described in the `akka.cluster.sharding`
* section of the `reference.conf`.
*
* The `ShardRegion` actor is started on each node in the cluster, or group of nodes
* tagged with a specific role. The `ShardRegion` is created with a [[ShardingMessageExtractor]]
* to extract the entity identifier and the shard identifier from incoming messages.
* A shard is a group of entities that will be managed together. For the first message in a
* specific shard the `ShardRegion` requests the location of the shard from a central coordinator,
* the [[ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion`
* owns the shard. The `ShardRegion` receives the decided home of the shard
* and if that is the `ShardRegion` instance itself it will create a local child
* actor representing the entity and direct all messages for that entity to it.
* If the shard home is another `ShardRegion` instance messages will be forwarded
* to that `ShardRegion` instance instead. While resolving the location of a
* shard incoming messages for that shard are buffered and later delivered when the
* shard location is known. Subsequent messages to the resolved shard can be delivered
* to the target destination immediately without involving the `ShardCoordinator`.
*
* To make sure that at most one instance of a specific entity actor is running somewhere
* in the cluster it is important that all nodes have the same view of where the shards
* are located. Therefore the shard allocation decisions are taken by the central
* `ShardCoordinator`, which is running as a cluster singleton, i.e. one instance on
* the oldest member among all cluster nodes or a group of nodes tagged with a specific
* role. The oldest member can be determined by [[akka.cluster.Member#isOlderThan]].
*
* To be able to use newly added members in the cluster the coordinator facilitates rebalancing
* of shards, i.e. migrate entities from one node to another. In the rebalance process the
* coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started.
* That means they will start buffering incoming messages for that shard, in the same way as if the
* shard location is unknown. During the rebalance process the coordinator will not answer any
* requests for the location of shards that are being rebalanced, i.e. local buffering will
* continue until the handoff is completed. The `ShardRegion` responsible for the rebalanced shard
* will stop all entities in that shard by sending the `handOffMessage` to them. When all entities have
* been terminated the `ShardRegion` owning the entities will acknowledge the handoff as completed
* to the coordinator. Thereafter the coordinator will reply to requests for the location of
* the shard and thereby allocate a new home for the shard and then buffered messages in the
* `ShardRegion` actors are delivered to the new location. This means that the state of the entities
* are not transferred or migrated. If the state of the entities are of importance it should be
* persistent (durable), e.g. with `akka-persistence`, so that it can be recovered at the new
* location.
*
* The logic that decides which shards to rebalance is defined in a plugable shard
* allocation strategy. The default implementation [[LeastShardAllocationStrategy]]
* picks shards for handoff from the `ShardRegion` with most number of previously allocated shards.
* They will then be allocated to the `ShardRegion` with least number of previously allocated shards,
* i.e. new members in the cluster. There is a configurable threshold of how large the difference
* must be to begin the rebalancing. This strategy can be replaced by an application specific
* implementation.
*
* The state of shard locations in the `ShardCoordinator` is stored with `akka-distributed-data` or
* `akka-persistence` to survive failures. When a crashed or unreachable coordinator
* node has been removed (via down) from the cluster a new `ShardCoordinator` singleton
* actor will take over and the state is recovered. During such a failure period shards
* with known location are still available, while messages for new (unknown) shards
* are buffered until the new `ShardCoordinator` becomes available.
*
* As long as a sender uses the same `ShardRegion` actor to deliver messages to an entity
* actor the order of the messages is preserved. As long as the buffer limit is not reached
* messages are delivered on a best effort basis, with at-most once delivery semantics,
* in the same way as ordinary message sending. Reliable end-to-end messaging, with
* at-least-once semantics can be added by using `AtLeastOnceDelivery` in `akka-persistence`.
*
* Some additional latency is introduced for messages targeted to new or previously
* unused shards due to the round-trip to the coordinator. Rebalancing of shards may
* also add latency. This should be considered when designing the application specific
* shard resolution, e.g. to avoid too fine grained shards.
*
* The `ShardRegion` actor can also be started in proxy only mode, i.e. it will not
* host any entities itself, but knows how to delegate messages to the right location.
*
* If the state of the entities are persistent you may stop entities that are not used to
* reduce memory consumption. This is done by the application specific implementation of
* the entity actors for example by defining receive timeout (`context.setReceiveTimeout`).
* If a message is already enqueued to the entity when it stops itself the enqueued message
* in the mailbox will be dropped. To support graceful passivation without losing such
* messages the entity actor can send [[ShardRegion.Passivate]] to its parent `ShardRegion`.
* The specified wrapped message in `Passivate` will be sent back to the entity, which is
* then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion`
* between reception of `Passivate` and termination of the entity. Such buffered messages
* are thereafter delivered to a new incarnation of the entity.
*
*/
@DoNotInherit
abstract class ClusterSharding {
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has
* such a role.
*
* Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the
* recipient actor.
* A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId
* [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced.
* @tparam A The type of command the entity accepts
*/
def spawn[A](
behavior: EntityIdToBehavior[A],
props: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]]
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node
* has such a role.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
* @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards,
* [[ClusterSharding#defaultShardAllocationStrategy]] is used if `Optional.empty`
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawnWithMessageExtractor[E, A](
behavior: EntityIdToBehavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
allocationStrategy: Optional[ShardAllocationStrategy]): ActorRef[E]
/**
* Create an `ActorRef`-like reference to a specific sharded entity.
* Currently you have to correctly specify the type of messages the target can handle.
*
* Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the
* here provided `entityId`.
*
* For in-depth documentation of its semantics, see [[EntityRef]].
*/
def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A]
/** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy
}
/** Allows starting a specific Sharded Entity by its entity identifier */
object StartEntity {
/**
* Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the
* specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it.
*/
def create[A](msgClass: Class[A], entityId: String): ShardingEnvelope[A] =
scaladsl.StartEntity[A](entityId)
}
/**
* The key of an entity type, the `name` must be unique.
*
* Not for user extension.
*/
@DoNotInherit abstract class EntityTypeKey[T] { scaladslSelf: scaladsl.EntityTypeKey[T]
def name: String
/**
* INTERNAL API
*/
@InternalApi private[akka] def asScala: scaladsl.EntityTypeKey[T] = scaladslSelf
}
object EntityTypeKey {
/**
* Creates an `EntityTypeKey`. The `name` must be unique.
*/
def create[T](messageClass: Class[T], name: String): EntityTypeKey[T] =
EntityTypeKeyImpl(name, messageClass.getName)
}
/**
* A reference to an sharded Entity, which allows `ActorRef`-like usage.
*
* An [[EntityRef]] is NOT an [[ActorRef]]by designin order to be explicit about the fact that the life-cycle
* of a sharded Entity is very different than a plain Actors. Most notably, this is shown by features of Entities
* such as re-balancing (an active Entity to a different node) or passivation. Both of which are aimed to be completely
* transparent to users of such Entity. In other words, if this were to be a plain ActorRef, it would be possible to
* apply DeathWatch to it, which in turn would then trigger when the sharded Actor stopped, breaking the illusion that
* Entity refs are "always there". Please note that while not encouraged, it is possible to expose an Actor's `self`
* [[ActorRef]] and watch it in case such notification is desired.
*
* Not for user extension.
*/
@DoNotInherit abstract class EntityRef[A] { scaladslSelf: scaladsl.EntityRef[A]
/**
* Send a message to the entity referenced by this EntityRef using *at-most-once*
* messaging semantics.
*/
def tell(msg: A): Unit
/**
* Allows to "ask" the [[EntityRef]] for a reply.
* See [[akka.actor.typed.javadsl.AskPattern]] for a complete write-up of this pattern
*
* Please note that a [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern.
*/
def ask[U](message: JFunction[ActorRef[U], A], timeout: Timeout, scheduler: Scheduler): CompletionStage[U]
/**
* INTERNAL API
*/
@InternalApi private[akka] def asScala: scaladsl.EntityRef[A] = scaladslSelf
}

View file

@ -0,0 +1,307 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.cluster.sharding.typed
package scaladsl
import scala.concurrent.Future
import akka.actor.Scheduler
import akka.util.Timeout
import scala.reflect.ClassTag
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.actor.typed.Props
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.typed.internal.ClusterShardingImpl
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity }
object ClusterSharding extends ExtensionId[ClusterSharding] {
override def createExtension(system: ActorSystem[_]): ClusterSharding =
new ClusterShardingImpl(system)
}
/**
* This extension provides sharding functionality of actors in a cluster.
* The typical use case is when you have many stateful actors that together consume
* more resources (e.g. memory) than fit on one machine. You need to distribute them across
* several nodes in the cluster and you want to be able to interact with them using their
* logical identifier, but without having to care about their physical location in the cluster,
* which might also change over time. It could for example be actors representing Aggregate Roots in
* Domain-Driven Design terminology. Here we call these actors "entities". These actors
* typically have persistent (durable) state, but this feature is not limited to
* actors with persistent state.
*
* In this context sharding means that actors with an identifier, so called entities,
* can be automatically distributed across multiple nodes in the cluster. Each entity
* actor runs only at one place, and messages can be sent to the entity without requiring
* the sender to know the location of the destination actor. This is achieved by sending
* the messages via a `ShardRegion` actor provided by this extension, which knows how
* to route the message with the entity id to the final destination.
*
* This extension is supposed to be used by first, typically at system startup on each node
* in the cluster, registering the supported entity types with the [[ClusterSharding#spawn]]
* method, which returns the `ShardRegion` actor reference for a named entity type.
* Messages to the entities are always sent via that `ActorRef`, i.e. the local `ShardRegion`.
* Messages can also be sent via the [[EntityRef]] retrieved with [[ClusterSharding#entityRefFor]],
* which will also send via the local `ShardRegion`.
*
* Some settings can be configured as described in the `akka.cluster.sharding`
* section of the `reference.conf`.
*
* The `ShardRegion` actor is started on each node in the cluster, or group of nodes
* tagged with a specific role. The `ShardRegion` is created with a [[ShardingMessageExtractor]]
* to extract the entity identifier and the shard identifier from incoming messages.
* A shard is a group of entities that will be managed together. For the first message in a
* specific shard the `ShardRegion` requests the location of the shard from a central coordinator,
* the [[ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion`
* owns the shard. The `ShardRegion` receives the decided home of the shard
* and if that is the `ShardRegion` instance itself it will create a local child
* actor representing the entity and direct all messages for that entity to it.
* If the shard home is another `ShardRegion` instance messages will be forwarded
* to that `ShardRegion` instance instead. While resolving the location of a
* shard incoming messages for that shard are buffered and later delivered when the
* shard location is known. Subsequent messages to the resolved shard can be delivered
* to the target destination immediately without involving the `ShardCoordinator`.
*
* To make sure that at most one instance of a specific entity actor is running somewhere
* in the cluster it is important that all nodes have the same view of where the shards
* are located. Therefore the shard allocation decisions are taken by the central
* `ShardCoordinator`, which is running as a cluster singleton, i.e. one instance on
* the oldest member among all cluster nodes or a group of nodes tagged with a specific
* role. The oldest member can be determined by [[akka.cluster.Member#isOlderThan]].
*
* To be able to use newly added members in the cluster the coordinator facilitates rebalancing
* of shards, i.e. migrate entities from one node to another. In the rebalance process the
* coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started.
* That means they will start buffering incoming messages for that shard, in the same way as if the
* shard location is unknown. During the rebalance process the coordinator will not answer any
* requests for the location of shards that are being rebalanced, i.e. local buffering will
* continue until the handoff is completed. The `ShardRegion` responsible for the rebalanced shard
* will stop all entities in that shard by sending the `handOffMessage` to them. When all entities have
* been terminated the `ShardRegion` owning the entities will acknowledge the handoff as completed
* to the coordinator. Thereafter the coordinator will reply to requests for the location of
* the shard and thereby allocate a new home for the shard and then buffered messages in the
* `ShardRegion` actors are delivered to the new location. This means that the state of the entities
* are not transferred or migrated. If the state of the entities are of importance it should be
* persistent (durable), e.g. with `akka-persistence`, so that it can be recovered at the new
* location.
*
* The logic that decides which shards to rebalance is defined in a plugable shard
* allocation strategy. The default implementation [[LeastShardAllocationStrategy]]
* picks shards for handoff from the `ShardRegion` with most number of previously allocated shards.
* They will then be allocated to the `ShardRegion` with least number of previously allocated shards,
* i.e. new members in the cluster. There is a configurable threshold of how large the difference
* must be to begin the rebalancing. This strategy can be replaced by an application specific
* implementation.
*
* The state of shard locations in the `ShardCoordinator` is stored with `akka-distributed-data` or
* `akka-persistence` to survive failures. When a crashed or unreachable coordinator
* node has been removed (via down) from the cluster a new `ShardCoordinator` singleton
* actor will take over and the state is recovered. During such a failure period shards
* with known location are still available, while messages for new (unknown) shards
* are buffered until the new `ShardCoordinator` becomes available.
*
* As long as a sender uses the same `ShardRegion` actor to deliver messages to an entity
* actor the order of the messages is preserved. As long as the buffer limit is not reached
* messages are delivered on a best effort basis, with at-most once delivery semantics,
* in the same way as ordinary message sending. Reliable end-to-end messaging, with
* at-least-once semantics can be added by using `AtLeastOnceDelivery` in `akka-persistence`.
*
* Some additional latency is introduced for messages targeted to new or previously
* unused shards due to the round-trip to the coordinator. Rebalancing of shards may
* also add latency. This should be considered when designing the application specific
* shard resolution, e.g. to avoid too fine grained shards.
*
* The `ShardRegion` actor can also be started in proxy only mode, i.e. it will not
* host any entities itself, but knows how to delegate messages to the right location.
*
* If the state of the entities are persistent you may stop entities that are not used to
* reduce memory consumption. This is done by the application specific implementation of
* the entity actors for example by defining receive timeout (`context.setReceiveTimeout`).
* If a message is already enqueued to the entity when it stops itself the enqueued message
* in the mailbox will be dropped. To support graceful passivation without losing such
* messages the entity actor can send [[ShardRegion.Passivate]] to its parent `ShardRegion`.
* The specified wrapped message in `Passivate` will be sent back to the entity, which is
* then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion`
* between reception of `Passivate` and termination of the entity. Such buffered messages
* are thereafter delivered to a new incarnation of the entity.
*
*/
@DoNotInherit
trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has
* such a role.
*
* Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the
* recipient actor.
* A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId
* [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced.
* @tparam A The type of command the entity accepts
*/
def spawn[A](
behavior: String Behavior[A],
props: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]]
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node
* has such a role.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
* @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards,
* [[ClusterSharding#defaultShardAllocationStrategy]] is used if `None`
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawnWithMessageExtractor[E, A](
behavior: String Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
allocationStrategy: Option[ShardAllocationStrategy]): ActorRef[E]
/**
* Create an `ActorRef`-like reference to a specific sharded entity.
* Currently you have to correctly specify the type of messages the target can handle.
*
* Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the
* here provided `entityId`.
*
* For in-depth documentation of its semantics, see [[EntityRef]].
*/
def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A]
/** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy
/**
* INTERNAL API
*/
@InternalApi private[akka] def asJava: javadsl.ClusterSharding = javadslSelf
}
/** Allows starting a specific Sharded Entity by its entity identifier */
object StartEntity {
/**
* Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the
* specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it.
*/
def apply[A](entityId: String): ShardingEnvelope[A] = {
// StartEntity isn't really of type A, but erased and StartEntity is only handled internally, not delivered to the entity
new ShardingEnvelope[A](entityId, UntypedStartEntity(entityId).asInstanceOf[A])
}
}
/**
* The key of an entity type, the `name` must be unique.
*
* Not for user extension.
*/
@DoNotInherit trait EntityTypeKey[T] {
def name: String
}
object EntityTypeKey {
/**
* Creates an `EntityTypeKey`. The `name` must be unique.
*/
def apply[T](name: String)(implicit tTag: ClassTag[T]): EntityTypeKey[T] =
EntityTypeKeyImpl(name, implicitly[ClassTag[T]].runtimeClass.getName)
}
/**
* A reference to an sharded Entity, which allows `ActorRef`-like usage.
*
* An [[EntityRef]] is NOT an [[ActorRef]]by designin order to be explicit about the fact that the life-cycle
* of a sharded Entity is very different than a plain Actors. Most notably, this is shown by features of Entities
* such as re-balancing (an active Entity to a different node) or passivation. Both of which are aimed to be completely
* transparent to users of such Entity. In other words, if this were to be a plain ActorRef, it would be possible to
* apply DeathWatch to it, which in turn would then trigger when the sharded Actor stopped, breaking the illusion that
* Entity refs are "always there". Please note that while not encouraged, it is possible to expose an Actor's `self`
* [[ActorRef]] and watch it in case such notification is desired.
* Not for user extension.
*/
@DoNotInherit trait EntityRef[A] {
/**
* Send a message to the entity referenced by this EntityRef using *at-most-once*
* messaging semantics.
*/
def tell(msg: A): Unit
/**
* Allows to "ask" the [[EntityRef]] for a reply.
* See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern
*
* Example usage:
* {{{
* case class Request(msg: String, replyTo: ActorRef[Reply])
* case class Reply(msg: String)
*
* implicit val timeout = Timeout(3.seconds)
* val target: EntityRef[Request] = ...
* val f: Future[Reply] = target ? (Request("hello", _))
* }}}
*
* Please note that an implicit [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern.
*/
def ask[U](f: ActorRef[U] A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U]
}
object EntityRef {
implicit final class EntityRefOps[A](val ref: EntityRef[A]) extends AnyVal {
/**
* Send a message to the Actor referenced by this ActorRef using *at-most-once*
* messaging semantics.
*/
def !(msg: A): Unit = ref.tell(msg)
/**
* Allows to "ask" the [[EntityRef]] for a reply.
* See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern
*
* Example usage:
* {{{
* case class Request(msg: String, replyTo: ActorRef[Reply])
* case class Reply(msg: String)
*
* implicit val timeout = Timeout(3.seconds)
* val target: EntityRef[Request] = ...
* val f: Future[Reply] = target ? (Request("hello", _))
* }}}
*
* Please note that an implicit [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern.
*/
def ?[U](message: ActorRef[U] A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] =
ref.ask(message)(timeout, scheduler)
}
}

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com>
*/
package jdoc.akka.cluster.sharding.typed; package jdoc.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef; import akka.actor.typed.ActorRef;
@ -5,11 +8,18 @@ import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior; import akka.actor.typed.Behavior;
import akka.actor.typed.Props; import akka.actor.typed.Props;
import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.sharding.typed.ClusterSharding;
import akka.cluster.sharding.typed.*;
import akka.cluster.typed.ClusterSingleton; import akka.cluster.typed.ClusterSingleton;
import akka.cluster.typed.ClusterSingletonSettings; import akka.cluster.typed.ClusterSingletonSettings;
//#import
import akka.cluster.sharding.typed.ClusterShardingSettings;
import akka.cluster.sharding.typed.ShardingEnvelope;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EntityRef;
//#import
public class ShardingCompileOnlyTest { public class ShardingCompileOnlyTest {
//#counter //#counter
@ -49,7 +59,7 @@ public class ShardingCompileOnlyTest {
//#spawn //#spawn
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.spawnJavadsl( ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.spawn(
entityId -> counter(entityId,0), entityId -> counter(entityId,0),
Props.empty(), Props.empty(),
typeKey, typeKey,

View file

@ -31,7 +31,8 @@ class ShardingSerializerSpec extends TestKit with TypedAkkaSpecWithShutdown {
} }
"must serialize and deserialize StartEntity" in { "must serialize and deserialize StartEntity" in {
checkSerialization(StartEntity("abc")) checkSerialization(scaladsl.StartEntity[Int]("abc"))
checkSerialization(javadsl.StartEntity.create(classOf[java.lang.Integer], "def"))
} }
} }
} }

View file

@ -1,9 +1,13 @@
/* /*
* Copyright (C) 2017-2018 Lightbend Inc. <http://www.lightbend.com/> * Copyright (C) 2017-2018 Lightbend Inc. <http://www.lightbend.com/>
*/ */
package akka.cluster.sharding.typed package akka.cluster.sharding.typed.scaladsl
import akka.actor.typed.{ ActorRef, Behavior, Props, TypedAkkaSpecWithShutdown } import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.TypedAkkaSpecWithShutdown
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.typed.Cluster import akka.cluster.typed.Cluster
import akka.cluster.typed.Join import akka.cluster.typed.Join
import akka.persistence.typed.scaladsl.PersistentBehaviors import akka.persistence.typed.scaladsl.PersistentBehaviors

View file

@ -2,21 +2,24 @@
* Copyright (C) 2017-2018 Lightbend Inc. <http://www.lightbend.com/> * Copyright (C) 2017-2018 Lightbend Inc. <http://www.lightbend.com/>
*/ */
package akka.cluster.sharding.typed package akka.cluster.sharding.typed.scaladsl
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.ActorRefResolver import akka.actor.typed.ActorRefResolver
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem
import akka.actor.typed.Props import akka.actor.typed.Props
import akka.actor.typed.TypedAkkaSpecWithShutdown import akka.actor.typed.TypedAkkaSpecWithShutdown
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.ShardingMessageExtractor
import akka.cluster.typed.Cluster import akka.cluster.typed.Cluster
import akka.cluster.typed.Join import akka.cluster.typed.Join
import akka.cluster.typed.Leave import akka.cluster.typed.Leave
@ -49,11 +52,11 @@ object ClusterShardingSpec {
#allow-java-serialization = off #allow-java-serialization = off
serializers { serializers {
test = "akka.cluster.sharding.typed.ClusterShardingSpec$$Serializer" test = "akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec$$Serializer"
} }
serialization-bindings { serialization-bindings {
"akka.cluster.sharding.typed.ClusterShardingSpec$$TestProtocol" = test "akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec$$TestProtocol" = test
"akka.cluster.sharding.typed.ClusterShardingSpec$$IdTestProtocol" = test "akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec$$IdTestProtocol" = test
} }
} }
""".stripMargin) """.stripMargin)
@ -133,8 +136,8 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
super.afterAll() super.afterAll()
} }
val typeKey = EntityTypeKey[TestProtocol]("envelope-shard") private val typeKey = EntityTypeKey[TestProtocol]("envelope-shard")
val behavior = Behaviors.immutable[TestProtocol] { private val behavior = Behaviors.immutable[TestProtocol] {
case (_, StopPlz()) case (_, StopPlz())
Behaviors.stopped Behaviors.stopped
@ -148,8 +151,8 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
Behaviors.same Behaviors.same
} }
val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard") private val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard")
val behaviorWithId = Behaviors.immutable[IdTestProtocol] { private val behaviorWithId = Behaviors.immutable[IdTestProtocol] {
case (_, IdStopPlz()) case (_, IdStopPlz())
Behaviors.stopped Behaviors.stopped
@ -163,7 +166,7 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
Behaviors.same Behaviors.same
} }
val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn[TestProtocol]( private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn(
_ behavior, _ behavior,
Props.empty, Props.empty,
typeKey, typeKey,
@ -171,7 +174,7 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
10, 10,
StopPlz()) StopPlz())
val shardingRef2 = sharding2.spawn[TestProtocol]( private val shardingRef2 = sharding2.spawn(
_ behavior, _ behavior,
Props.empty, Props.empty,
typeKey, typeKey,
@ -179,7 +182,7 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
10, 10,
StopPlz()) StopPlz())
val shardingRef3: ActorRef[IdTestProtocol] = sharding.spawn3[IdTestProtocol, IdTestProtocol]( private val shardingRef3: ActorRef[IdTestProtocol] = sharding.spawnWithMessageExtractor(
_ behaviorWithId, _ behaviorWithId,
Props.empty, Props.empty,
typeKey2, typeKey2,
@ -188,9 +191,10 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
case IdReplyPlz(id, _) id case IdReplyPlz(id, _) id
case IdWhoAreYou(id, _) id case IdWhoAreYou(id, _) id
case other throw new IllegalArgumentException(s"Unexpected message $other") case other throw new IllegalArgumentException(s"Unexpected message $other")
}) },
None)
val shardingRef4 = sharding2.spawn3[IdTestProtocol, IdTestProtocol]( private val shardingRef4 = sharding2.spawnWithMessageExtractor(
_ behaviorWithId, _ behaviorWithId,
Props.empty, Props.empty,
typeKey2, typeKey2,
@ -199,11 +203,12 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
case IdReplyPlz(id, _) id case IdReplyPlz(id, _) id
case IdWhoAreYou(id, _) id case IdWhoAreYou(id, _) id
case other throw new IllegalArgumentException(s"Unexpected message $other") case other throw new IllegalArgumentException(s"Unexpected message $other")
}) },
None)
def totalEntityCount1(): Int = { def totalEntityCount1(): Int = {
import akka.pattern.ask import akka.pattern.ask
implicit val timeout = Timeout(6.seconds) implicit val timeout: Timeout = Timeout(6.seconds)
val statsBefore = (shardingRef1.toUntyped ? akka.cluster.sharding.ShardRegion.GetClusterShardingStats(5.seconds)) val statsBefore = (shardingRef1.toUntyped ? akka.cluster.sharding.ShardRegion.GetClusterShardingStats(5.seconds))
.mapTo[akka.cluster.sharding.ShardRegion.ClusterShardingStats] .mapTo[akka.cluster.sharding.ShardRegion.ClusterShardingStats]
val totalCount = statsBefore.futureValue.regions.values.flatMap(_.stats.values).sum val totalCount = statsBefore.futureValue.regions.values.flatMap(_.stats.values).sum

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com>
*/
package doc.akka.cluster.sharding.typed package doc.akka.cluster.sharding.typed
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
@ -11,7 +14,12 @@ object ShardingCompileOnlySpec {
val system = ActorSystem(Behaviors.empty, "Sharding") val system = ActorSystem(Behaviors.empty, "Sharding")
//#sharding-extension //#sharding-extension
import akka.cluster.sharding.typed._ import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.EntityRef
val sharding = ClusterSharding(system) val sharding = ClusterSharding(system)
//#sharding-extension //#sharding-extension
@ -33,7 +41,7 @@ object ShardingCompileOnlySpec {
//#spawn //#spawn
val TypeKey = EntityTypeKey[CounterCommand]("Counter") val TypeKey = EntityTypeKey[CounterCommand]("Counter")
// if a extractor is defined then the type would be ActorRef[BasicCommand] // if a extractor is defined then the type would be ActorRef[BasicCommand]
val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn[CounterCommand]( val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn(
behavior = entityId counter(entityId, 0), behavior = entityId counter(entityId, 0),
props = Props.empty, props = Props.empty,
typeKey = TypeKey, typeKey = TypeKey,

View file

@ -87,11 +87,6 @@ import akka.annotation.InternalApi
* the oldest member among all cluster nodes or a group of nodes tagged with a specific * the oldest member among all cluster nodes or a group of nodes tagged with a specific
* role. The oldest member can be determined by [[akka.cluster.Member#isOlderThan]]. * role. The oldest member can be determined by [[akka.cluster.Member#isOlderThan]].
* *
* The logic that decides where a shard is to be located is defined in a pluggable shard
* allocation strategy. The default implementation [[ShardCoordinator.LeastShardAllocationStrategy]]
* allocates new shards to the `ShardRegion` with least number of previously allocated shards.
* This strategy can be replaced by an application specific implementation.
*
* To be able to use newly added members in the cluster the coordinator facilitates rebalancing * To be able to use newly added members in the cluster the coordinator facilitates rebalancing
* of shards, i.e. migrate entities from one node to another. In the rebalance process the * of shards, i.e. migrate entities from one node to another. In the rebalance process the
* coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started. * coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started.
@ -116,9 +111,8 @@ import akka.annotation.InternalApi
* must be to begin the rebalancing. This strategy can be replaced by an application specific * must be to begin the rebalancing. This strategy can be replaced by an application specific
* implementation. * implementation.
* *
* The state of shard locations in the `ShardCoordinator` is persistent (durable) with * The state of shard locations in the `ShardCoordinator` is stored with `akka-distributed-data` or
* `akka-persistence` to survive failures. Since it is running in a cluster `akka-persistence` * `akka-persistence` to survive failures. When a crashed or unreachable coordinator
* must be configured with a distributed journal. When a crashed or unreachable coordinator
* node has been removed (via down) from the cluster a new `ShardCoordinator` singleton * node has been removed (via down) from the cluster a new `ShardCoordinator` singleton
* actor will take over and the state is recovered. During such a failure period shards * actor will take over and the state is recovered. During such a failure period shards
* with known location are still available, while messages for new (unknown) shards * with known location are still available, while messages for new (unknown) shards

View file

@ -30,7 +30,7 @@ Scala
: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharding-extension } : @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharding-extension }
Java Java
: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #sharding-extension } : @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #import #sharding-extension }
It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter: It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter: