Merge pull request #25697 from akka/wip-PersistentEntity-patriknw

PersistentEntity to glue together Sharding and PersistentBehavior better
This commit is contained in:
Patrik Nordwall 2018-10-18 13:11:18 +02:00 committed by GitHub
commit 1d4fb852d0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 670 additions and 165 deletions

View file

@ -30,6 +30,7 @@ import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity } import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity }
import akka.cluster.sharding.typed.scaladsl.EntityContext
import akka.cluster.typed.Cluster import akka.cluster.typed.Cluster
import akka.event.Logging import akka.event.Logging
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
@ -130,37 +131,40 @@ import akka.util.Timeout
private val shardCommandActors: ConcurrentHashMap[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] = new ConcurrentHashMap private val shardCommandActors: ConcurrentHashMap[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] = new ConcurrentHashMap
// scaladsl impl // scaladsl impl
override def start[M, E](shardedEntity: scaladsl.ShardedEntity[M, E]): ActorRef[E] = { override def start[M, E](entity: scaladsl.Entity[M, E]): ActorRef[E] = {
val settings = shardedEntity.settings match { val settings = entity.settings match {
case None ClusterShardingSettings(system) case None ClusterShardingSettings(system)
case Some(s) s case Some(s) s
} }
val extractor = (shardedEntity.messageExtractor match { val extractor = (entity.messageExtractor match {
case None new HashCodeMessageExtractor[M](settings.numberOfShards) case None new HashCodeMessageExtractor[M](settings.numberOfShards)
case Some(e) e case Some(e) e
}).asInstanceOf[ShardingMessageExtractor[E, M]] }).asInstanceOf[ShardingMessageExtractor[E, M]]
internalStart(shardedEntity.create, shardedEntity.entityProps, shardedEntity.typeKey, internalStart(entity.createBehavior, entity.entityProps, entity.typeKey,
shardedEntity.stopMessage, settings, extractor, shardedEntity.allocationStrategy) entity.stopMessage, settings, extractor, entity.allocationStrategy)
} }
// javadsl impl // javadsl impl
override def start[M, E](shardedEntity: javadsl.ShardedEntity[M, E]): ActorRef[E] = { override def start[M, E](entity: javadsl.Entity[M, E]): ActorRef[E] = {
import scala.compat.java8.OptionConverters._ import scala.compat.java8.OptionConverters._
start(new scaladsl.ShardedEntity( start(new scaladsl.Entity(
create = (shard, entitityId) shardedEntity.createBehavior.apply(shard, entitityId), createBehavior = (ctx: EntityContext) Behaviors.setup[M] { actorContext
typeKey = shardedEntity.typeKey.asScala, entity.createBehavior(
stopMessage = shardedEntity.stopMessage, new javadsl.EntityContext[M](ctx.entityId, ctx.shard, actorContext.asJava))
entityProps = shardedEntity.entityProps, },
settings = shardedEntity.settings.asScala, typeKey = entity.typeKey.asScala,
messageExtractor = shardedEntity.messageExtractor.asScala, stopMessage = entity.stopMessage,
allocationStrategy = shardedEntity.allocationStrategy.asScala entityProps = entity.entityProps,
settings = entity.settings.asScala,
messageExtractor = entity.messageExtractor.asScala,
allocationStrategy = entity.allocationStrategy.asScala
)) ))
} }
private def internalStart[M, E]( private def internalStart[M, E](
behavior: (ActorRef[scaladsl.ClusterSharding.ShardCommand], String) Behavior[M], behavior: EntityContext Behavior[M],
entityProps: Props, entityProps: Props,
typeKey: scaladsl.EntityTypeKey[M], typeKey: scaladsl.EntityTypeKey[M],
stopMessage: M, stopMessage: M,
@ -185,7 +189,7 @@ import akka.util.Timeout
if (settings.shouldHostShard(cluster)) { if (settings.shouldHostShard(cluster)) {
log.info("Starting Shard Region [{}]...", typeKey.name) log.info("Starting Shard Region [{}]...", typeKey.name)
val shardCommandDelegator = val shardCommandDelegator: ActorRef[scaladsl.ClusterSharding.ShardCommand] =
shardCommandActors.computeIfAbsent( shardCommandActors.computeIfAbsent(
typeKey.name, typeKey.name,
new java.util.function.Function[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] { new java.util.function.Function[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] {
@ -198,7 +202,7 @@ import akka.util.Timeout
}) })
val untypedEntityPropsFactory: String akka.actor.Props = { entityId val untypedEntityPropsFactory: String akka.actor.Props = { entityId
PropsAdapter(behavior(shardCommandDelegator, entityId), entityProps) PropsAdapter(behavior(new EntityContext(entityId, shardCommandDelegator)), entityProps)
} }
untypedSharding.internalStart( untypedSharding.internalStart(

View file

@ -15,6 +15,7 @@ import akka.actor.typed.Behavior
import akka.actor.typed.RecipientRef import akka.actor.typed.RecipientRef
import akka.actor.typed.Props import akka.actor.typed.Props
import akka.actor.typed.internal.InternalRecipientRef import akka.actor.typed.internal.InternalRecipientRef
import akka.actor.typed.javadsl.ActorContext
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
@ -167,7 +168,7 @@ object ClusterSharding {
abstract class ClusterSharding { abstract class ClusterSharding {
/** /**
* Initialize sharding for the given `shardedEntity` factory settings. * Initialize sharding for the given `entity` factory settings.
* *
* It will start a shard region or a proxy depending on if the settings require role and if this node has * It will start a shard region or a proxy depending on if the settings require role and if this node has
* such a role. * such a role.
@ -175,7 +176,7 @@ abstract class ClusterSharding {
* @tparam M The type of message the entity accepts * @tparam M The type of message the entity accepts
* @tparam E A possible envelope around the message the entity accepts * @tparam E A possible envelope around the message the entity accepts
*/ */
def start[M, E](shardedEntity: ShardedEntity[M, E]): ActorRef[E] def start[M, E](entity: Entity[M, E]): ActorRef[E]
/** /**
* Create an `ActorRef`-like reference to a specific sharded entity. * Create an `ActorRef`-like reference to a specific sharded entity.
@ -195,49 +196,62 @@ abstract class ClusterSharding {
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy
} }
object ShardedEntity { object Entity {
/** /**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional * Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional
* settings can be defined using the `with` methods of the returned [[ShardedEntity]]. * settings can be defined using the `with` methods of the returned [[Entity]].
*
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent actors
* is very common and therefore the [[Entity.ofPersistentEntity]] is provided as convenience.
* *
* @param createBehavior Create the behavior for an entity given an entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster * @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId)
* @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. * @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
*
* @tparam M The type of message the entity accepts * @tparam M The type of message the entity accepts
*/ */
def create[M]( def of[M](
createBehavior: JFunction[String, Behavior[M]],
typeKey: EntityTypeKey[M], typeKey: EntityTypeKey[M],
stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] = { createBehavior: JFunction[EntityContext[M], Behavior[M]],
create(new BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]] { stopMessage: M): Entity[M, ShardingEnvelope[M]] = {
override def apply(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[M] = new Entity(createBehavior, typeKey, stopMessage, Props.empty, Optional.empty(), Optional.empty(), Optional.empty())
createBehavior.apply(entityId)
}, typeKey, stopMessage)
} }
/** /**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional * Defines how the [[PersistentEntity]] should be created. Used in [[ClusterSharding#start]]. Any [[Behavior]] can
* settings can be defined using the `with` methods of the returned [[ShardedEntity]]. * be used as a sharded entity actor, but the combination of sharding and persistent actors is very common
* and therefore this factory is provided as convenience.
*
* More optional settings can be defined using the `with` methods of the returned [[Entity]].
* *
* @param createBehavior Create the behavior for an entity given `ShardCommand` ref and an entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster * @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param createPersistentEntity Create the `PersistentEntity` for an entity given a [[EntityContext]] (includes entityId)
* @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. * @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
* @tparam M The type of message the entity accepts * @tparam Command The type of message the entity accepts
*/ */
def create[M]( def ofPersistentEntity[Command, Event, State >: Null](
createBehavior: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]], typeKey: EntityTypeKey[Command],
typeKey: EntityTypeKey[M], createPersistentEntity: JFunction[EntityContext[Command], PersistentEntity[Command, Event, State]],
stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] = stopMessage: Command): Entity[Command, ShardingEnvelope[Command]] = {
new ShardedEntity(createBehavior, typeKey, stopMessage, Props.empty, Optional.empty(), Optional.empty(), Optional.empty())
of(typeKey, new JFunction[EntityContext[Command], Behavior[Command]] {
override def apply(ctx: EntityContext[Command]): Behavior[Command] = {
val persistentEntity = createPersistentEntity(ctx)
if (persistentEntity.entityTypeKey != typeKey)
throw new IllegalArgumentException(s"The [${persistentEntity.entityTypeKey}] of the PersistentEntity " +
s" [${persistentEntity.getClass.getName}] doesn't match expected $typeKey.")
persistentEntity
}
}, stopMessage)
}
} }
/** /**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. * Defines how the entity should be created. Used in [[ClusterSharding#start]].
*/ */
final class ShardedEntity[M, E] private[akka] ( final class Entity[M, E] private[akka] (
val createBehavior: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]], val createBehavior: JFunction[EntityContext[M], Behavior[M]],
val typeKey: EntityTypeKey[M], val typeKey: EntityTypeKey[M],
val stopMessage: M, val stopMessage: M,
val entityProps: Props, val entityProps: Props,
@ -248,13 +262,13 @@ final class ShardedEntity[M, E] private[akka] (
/** /**
* [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings. * [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings.
*/ */
def withEntityProps(newEntityProps: Props): ShardedEntity[M, E] = def withEntityProps(newEntityProps: Props): Entity[M, E] =
copy(entityProps = newEntityProps) copy(entityProps = newEntityProps)
/** /**
* Additional settings, typically loaded from configuration. * Additional settings, typically loaded from configuration.
*/ */
def withSettings(newSettings: ClusterShardingSettings): ShardedEntity[M, E] = def withSettings(newSettings: ClusterShardingSettings): Entity[M, E] =
copy(settings = Optional.ofNullable(newSettings)) copy(settings = Optional.ofNullable(newSettings))
/** /**
@ -265,29 +279,44 @@ final class ShardedEntity[M, E] private[akka] (
* shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default * shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default
* is configured with `akka.cluster.sharding.number-of-shards`. * is configured with `akka.cluster.sharding.number-of-shards`.
*/ */
def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): ShardedEntity[M, Envelope] = def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): Entity[M, Envelope] =
new ShardedEntity(createBehavior, typeKey, stopMessage, entityProps, settings, Optional.ofNullable(newExtractor), allocationStrategy) new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, Optional.ofNullable(newExtractor), allocationStrategy)
/** /**
* Allocation strategy which decides on which nodes to allocate new shards, * Allocation strategy which decides on which nodes to allocate new shards,
* [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified. * [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified.
*/ */
def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): ShardedEntity[M, E] = def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): Entity[M, E] =
copy(allocationStrategy = Optional.ofNullable(newAllocationStrategy)) copy(allocationStrategy = Optional.ofNullable(newAllocationStrategy))
private def copy( private def copy(
create: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]] = createBehavior, createBehavior: JFunction[EntityContext[M], Behavior[M]] = createBehavior,
typeKey: EntityTypeKey[M] = typeKey, typeKey: EntityTypeKey[M] = typeKey,
stopMessage: M = stopMessage, stopMessage: M = stopMessage,
entityProps: Props = entityProps, entityProps: Props = entityProps,
settings: Optional[ClusterShardingSettings] = settings, settings: Optional[ClusterShardingSettings] = settings,
allocationStrategy: Optional[ShardAllocationStrategy] = allocationStrategy allocationStrategy: Optional[ShardAllocationStrategy] = allocationStrategy
): ShardedEntity[M, E] = { ): Entity[M, E] = {
new ShardedEntity(create, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy) new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy)
} }
} }
/**
* Parameter to [[Entity.of]]
*/
final class EntityContext[M](
entityId: String,
shard: ActorRef[ClusterSharding.ShardCommand],
actorContext: ActorContext[M]) {
def getEntityId: String = entityId
def getShard: ActorRef[ClusterSharding.ShardCommand] = shard
def getActorContext: ActorContext[M] = actorContext
}
/** Allows starting a specific Sharded Entity by its entity identifier */ /** Allows starting a specific Sharded Entity by its entity identifier */
object StartEntity { object StartEntity {

View file

@ -0,0 +1,37 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed.javadsl
import java.util.Optional
import scala.compat.java8.OptionConverters._
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.javadsl.PersistentBehavior
/**
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent
* actors is very common and therefore this `PersistentEntity` class is provided as convenience.
*
* It is a [[PersistentBehavior]] and is implemented in the same way. It selects the `persistenceId`
* automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using
* [[EntityTypeKey.persistenceIdFrom]].
*/
abstract class PersistentEntity[Command, Event, State >: Null] private (
val entityTypeKey: EntityTypeKey[Command],
persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy])
extends PersistentBehavior[Command, Event, State](persistenceId, supervisorStrategy) {
def this(entityTypeKey: EntityTypeKey[Command], entityId: String) = {
this(entityTypeKey, persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.empty[BackoffSupervisorStrategy])
}
def this(entityTypeKey: EntityTypeKey[Command], entityId: String, backoffSupervisorStrategy: BackoffSupervisorStrategy) = {
this(entityTypeKey, persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.ofNullable(backoffSupervisorStrategy))
}
}

View file

@ -19,6 +19,7 @@ import akka.actor.typed.ExtensionSetup
import akka.actor.typed.RecipientRef import akka.actor.typed.RecipientRef
import akka.actor.typed.Props import akka.actor.typed.Props
import akka.actor.typed.internal.InternalRecipientRef import akka.actor.typed.internal.InternalRecipientRef
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
@ -26,6 +27,7 @@ import akka.cluster.sharding.typed.internal.ClusterShardingImpl
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity } import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity }
import akka.persistence.typed.PersistenceId import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.PersistentBehavior
object ClusterSharding extends ExtensionId[ClusterSharding] { object ClusterSharding extends ExtensionId[ClusterSharding] {
@ -168,7 +170,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] {
trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
/** /**
* Initialize sharding for the given `shardedEntity` factory settings. * Initialize sharding for the given `entity` factory settings.
* *
* It will start a shard region or a proxy depending on if the settings require role and if this node has * It will start a shard region or a proxy depending on if the settings require role and if this node has
* such a role. * such a role.
@ -176,7 +178,7 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
* @tparam M The type of message the entity accepts * @tparam M The type of message the entity accepts
* @tparam E A possible envelope around the message the entity accepts * @tparam E A possible envelope around the message the entity accepts
*/ */
def start[M, E](shardedEntity: ShardedEntity[M, E]): ActorRef[E] def start[M, E](entity: Entity[M, E]): ActorRef[E]
/** /**
* Create an `ActorRef`-like reference to a specific sharded entity. * Create an `ActorRef`-like reference to a specific sharded entity.
@ -201,45 +203,33 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
@InternalApi private[akka] def asJava: javadsl.ClusterSharding = javadslSelf @InternalApi private[akka] def asJava: javadsl.ClusterSharding = javadslSelf
} }
object ShardedEntity { object Entity {
/** /**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional * Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional
* settings can be defined using the `with` methods of the returned [[ShardedEntity]]. * settings can be defined using the `with` methods of the returned [[Entity]].
*
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent actors
* is very common and therefore [[PersistentEntity]] is provided as a convenience for creating such
* [[PersistentBehavior]].
* *
* @param create Create the behavior for an entity given an entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
*
* @tparam M The type of message the entity accepts
*/
def apply[M](
create: String Behavior[M],
typeKey: EntityTypeKey[M],
stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] =
apply((_, entityId) create(entityId), typeKey, stopMessage)
/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional
* settings can be defined using the `with` methods of the returned [[ShardedEntity]].
*
* @param create Create the behavior for an entity given `ShardCommand` ref and an entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster * @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId)
* @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. * @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
* @tparam M The type of message the entity accepts * @tparam M The type of message the entity accepts
*/ */
def apply[M]( def apply[M](
create: (ActorRef[ClusterSharding.ShardCommand], String) Behavior[M], typeKey: EntityTypeKey[M],
typeKey: EntityTypeKey[M], createBehavior: EntityContext Behavior[M],
stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] = stopMessage: M): Entity[M, ShardingEnvelope[M]] =
new ShardedEntity(create, typeKey, stopMessage, Props.empty, None, None, None) new Entity(createBehavior, typeKey, stopMessage, Props.empty, None, None, None)
} }
/** /**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. * Defines how the entity should be created. Used in [[ClusterSharding#start]].
*/ */
final class ShardedEntity[M, E] private[akka] ( final class Entity[M, E] private[akka] (
val create: (ActorRef[ClusterSharding.ShardCommand], String) Behavior[M], val createBehavior: EntityContext Behavior[M],
val typeKey: EntityTypeKey[M], val typeKey: EntityTypeKey[M],
val stopMessage: M, val stopMessage: M,
val entityProps: Props, val entityProps: Props,
@ -250,13 +240,13 @@ final class ShardedEntity[M, E] private[akka] (
/** /**
* [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings. * [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings.
*/ */
def withEntityProps(newEntityProps: Props): ShardedEntity[M, E] = def withEntityProps(newEntityProps: Props): Entity[M, E] =
copy(entityProps = newEntityProps) copy(entityProps = newEntityProps)
/** /**
* Additional settings, typically loaded from configuration. * Additional settings, typically loaded from configuration.
*/ */
def withSettings(newSettings: ClusterShardingSettings): ShardedEntity[M, E] = def withSettings(newSettings: ClusterShardingSettings): Entity[M, E] =
copy(settings = Option(newSettings)) copy(settings = Option(newSettings))
/** /**
@ -267,29 +257,36 @@ final class ShardedEntity[M, E] private[akka] (
* shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default * shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default
* is configured with `akka.cluster.sharding.number-of-shards`. * is configured with `akka.cluster.sharding.number-of-shards`.
*/ */
def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): ShardedEntity[M, Envelope] = def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): Entity[M, Envelope] =
new ShardedEntity(create, typeKey, stopMessage, entityProps, settings, Option(newExtractor), allocationStrategy) new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, Option(newExtractor), allocationStrategy)
/** /**
* Allocation strategy which decides on which nodes to allocate new shards, * Allocation strategy which decides on which nodes to allocate new shards,
* [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified. * [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified.
*/ */
def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): ShardedEntity[M, E] = def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): Entity[M, E] =
copy(allocationStrategy = Option(newAllocationStrategy)) copy(allocationStrategy = Option(newAllocationStrategy))
private def copy( private def copy(
create: (ActorRef[ClusterSharding.ShardCommand], String) Behavior[M] = create, createBehavior: EntityContext Behavior[M] = createBehavior,
typeKey: EntityTypeKey[M] = typeKey, typeKey: EntityTypeKey[M] = typeKey,
stopMessage: M = stopMessage, stopMessage: M = stopMessage,
entityProps: Props = entityProps, entityProps: Props = entityProps,
settings: Option[ClusterShardingSettings] = settings, settings: Option[ClusterShardingSettings] = settings,
allocationStrategy: Option[ShardAllocationStrategy] = allocationStrategy allocationStrategy: Option[ShardAllocationStrategy] = allocationStrategy
): ShardedEntity[M, E] = { ): Entity[M, E] = {
new ShardedEntity(create, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy) new Entity(createBehavior, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy)
} }
} }
/**
* Parameter to [[Entity.apply]]
*/
final class EntityContext(
val entityId: String,
val shard: ActorRef[ClusterSharding.ShardCommand])
/** Allows starting a specific Sharded Entity by its entity identifier */ /** Allows starting a specific Sharded Entity by its entity identifier */
object StartEntity { object StartEntity {

View file

@ -0,0 +1,30 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed.scaladsl
import akka.actor.typed.Behavior
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehavior
object PersistentEntity {
/**
* Create a `Behavior` for a persistent actor that is used with Cluster Sharding.
*
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent
* actors is very common and therefore this `PersistentEntity` is provided as convenience.
*
* It is a [[PersistentBehavior]] and is implemented in the same way. It selects the `persistenceId`
* automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using
* [[EntityTypeKey.persistenceIdFrom]].
*/
def apply[Command, Event, State](
entityTypeKey: EntityTypeKey[Command],
entityId: String,
emptyState: State,
commandHandler: (State, Command) Effect[Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
PersistentBehavior(entityTypeKey.persistenceIdFrom(entityId), emptyState, commandHandler, eventHandler)
}

View file

@ -7,7 +7,7 @@ package akka.cluster.sharding.typed
import akka.actor.typed.{ ActorRef, Props } import akka.actor.typed.{ ActorRef, Props }
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.ShardedEntity import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.typed.{ MultiDcClusterActors, MultiNodeTypedClusterSpec } import akka.cluster.typed.{ MultiDcClusterActors, MultiNodeTypedClusterSpec }
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.testkit.typed.scaladsl.TestProbe
@ -69,9 +69,9 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh
"start sharding" in { "start sharding" in {
val sharding = ClusterSharding(typedSystem) val sharding = ClusterSharding(typedSystem)
val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.start( val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.start(
ShardedEntity( Entity(
_ multiDcPinger,
typeKey, typeKey,
_ multiDcPinger,
NoMore)) NoMore))
val probe = TestProbe[Pong] val probe = TestProbe[Pong]
shardRegion ! ShardingEnvelope(entityId, Ping(probe.ref)) shardRegion ! ShardingEnvelope(entityId, Ping(probe.ref))
@ -99,9 +99,9 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh
"be able to message cross dc via proxy" in { "be able to message cross dc via proxy" in {
runOn(first, second) { runOn(first, second) {
val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).start( val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).start(
ShardedEntity( Entity(
_ multiDcPinger,
typeKey, typeKey,
_ multiDcPinger,
NoMore) NoMore)
.withSettings(ClusterShardingSettings(typedSystem).withDataCenter("dc2"))) .withSettings(ClusterShardingSettings(typedSystem).withDataCenter("dc2")))
val probe = TestProbe[Pong] val probe = TestProbe[Pong]

View file

@ -0,0 +1,163 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.ActorContext;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletionStage;
//#persistent-entity-import
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.PersistentEntity;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.Effect;
import akka.persistence.typed.javadsl.EventHandler;
//#persistent-entity-import
//#persistent-entity-usage-import
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.util.Timeout;
//#persistent-entity-usage-import
public class HelloWorldPersistentEntityExample {
//#persistent-entity-usage
public static class HelloWorldService {
private final ActorSystem<?> system;
private final ClusterSharding sharding;
private final Timeout askTimeout = Timeout.create(Duration.ofSeconds(5));
// registration at startup
public HelloWorldService(ActorSystem<?> system) {
this.system = system;
sharding = ClusterSharding.get(system);
sharding.start(
Entity.ofPersistentEntity(
HelloWorld.ENTITY_TYPE_KEY,
ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId()),
HelloWorld.Passivate.INSTANCE));
}
// usage example
public CompletionStage<Integer> sayHello(String worldId, String whom) {
EntityRef<HelloWorld.Command> entityRef =
sharding.entityRefFor(HelloWorld.ENTITY_TYPE_KEY, worldId);
CompletionStage<HelloWorld.Greeting> result =
entityRef.ask(replyTo -> new HelloWorld.Greet(whom, replyTo), askTimeout);
return result.thenApply(greeting -> greeting.numberOfPeople);
}
}
//#persistent-entity-usage
//#persistent-entity
public static class HelloWorld extends PersistentEntity<HelloWorld.Command, HelloWorld.Greeted, HelloWorld.KnownPeople> {
// Command
interface Command {
}
public static final class Greet implements Command {
public final String whom;
public final ActorRef<Greeting> replyTo;
public Greet(String whom, ActorRef<Greeting> replyTo) {
this.whom = whom;
this.replyTo = replyTo;
}
}
enum Passivate implements Command {
INSTANCE
}
// Response
public static final class Greeting {
public final String whom;
public final int numberOfPeople;
public Greeting(String whom, int numberOfPeople) {
this.whom = whom;
this.numberOfPeople = numberOfPeople;
}
}
// Event
public static final class Greeted {
public final String whom;
public Greeted(String whom) {
this.whom = whom;
}
}
// State
static final class KnownPeople {
private Set<String> names = Collections.emptySet();
KnownPeople() {
}
private KnownPeople(Set<String> names) {
this.names = names;
}
KnownPeople add(String name) {
Set<String> newNames = new HashSet<>(names);
newNames.add(name);
return new KnownPeople(newNames);
}
int numberOfPeople() {
return names.size();
}
}
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "HelloWorld");
public HelloWorld(ActorContext<Command> ctx, String entityId) {
super(ENTITY_TYPE_KEY, entityId);
}
@Override
public KnownPeople emptyState() {
return new KnownPeople();
}
@Override
public CommandHandler<Command, Greeted, KnownPeople> commandHandler() {
return commandHandlerBuilder(KnownPeople.class)
.matchCommand(Greet.class, this::greet)
.matchCommand(Greet.class, this::passivate)
.build();
}
private Effect<Greeted, KnownPeople> passivate(KnownPeople state, Command cmd) {
return Effect().stop();
}
private Effect<Greeted, KnownPeople> greet(KnownPeople state, Greet cmd) {
return Effect().persist(new Greeted(cmd.whom))
.andThen(newState -> cmd.replyTo.tell(new Greeting(cmd.whom, newState.numberOfPeople())));
}
@Override
public EventHandler<KnownPeople, Greeted> eventHandler() {
return (state, evt) -> state.add(evt.whom);
}
}
//#persistent-entity
}

View file

@ -0,0 +1,69 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.cluster.sharding.typed;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Join;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import static jdocs.akka.cluster.sharding.typed.HelloWorldPersistentEntityExample.*;
import static org.junit.Assert.assertEquals;
public class HelloWorldPersistentEntityExampleTest extends JUnitSuite {
public static final Config config = ConfigFactory.parseString(
"akka.actor.provider = cluster \n" +
"akka.remote.netty.tcp.port = 0 \n" +
"akka.remote.artery.canonical.port = 0 \n" +
"akka.remote.artery.canonical.hostname = 127.0.0.1 \n" +
"akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n");
@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
private ClusterSharding _sharding = null;
private ClusterSharding sharding() {
if (_sharding == null) {
// initialize first time only
Cluster cluster = Cluster.get(testKit.system());
cluster.manager().tell(new Join(cluster.selfMember().address()));
ClusterSharding sharding = ClusterSharding.get(testKit.system());
sharding.start(
Entity.ofPersistentEntity(
HelloWorld.ENTITY_TYPE_KEY,
ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId()),
HelloWorld.Passivate.INSTANCE));
_sharding = sharding;
}
return _sharding;
}
@Test
public void sayHello() {
EntityRef<HelloWorld.Command> world = sharding().entityRefFor(HelloWorld.ENTITY_TYPE_KEY, "1");
TestProbe<HelloWorld.Greeting> probe = testKit.createTestProbe(HelloWorld.Greeting.class);
world.tell(new HelloWorld.Greet("Alice", probe.getRef()));
HelloWorld.Greeting greeting1 = probe.expectMessageClass(HelloWorld.Greeting.class);
assertEquals("Alice", greeting1.whom);
assertEquals(1, greeting1.numberOfPeople);
world.tell(new HelloWorld.Greet("Bob", probe.getRef()));
HelloWorld.Greeting greeting2 = probe.expectMessageClass(HelloWorld.Greeting.class);
assertEquals("Bob", greeting2.whom);
assertEquals(2, greeting2.numberOfPeople);
}
}

View file

@ -16,7 +16,7 @@ import akka.cluster.sharding.typed.ShardingEnvelope;
import akka.cluster.sharding.typed.javadsl.ClusterSharding; import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey; import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EntityRef; import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.cluster.sharding.typed.javadsl.ShardedEntity; import akka.cluster.sharding.typed.javadsl.Entity;
//#import //#import
@ -103,9 +103,9 @@ public class ShardingCompileOnlyTest {
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
sharding.start( sharding.start(
ShardedEntity.create( Entity.of(
(shard, entityId) -> counter2(shard, entityId),
typeKey, typeKey,
ctx -> counter2(ctx.getShard(), ctx.getEntityId()),
new GoodByeCounter())); new GoodByeCounter()));
//#counter-passivate-start //#counter-passivate-start
} }
@ -124,9 +124,9 @@ public class ShardingCompileOnlyTest {
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.start( ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.start(
ShardedEntity.create( Entity.of(
entityId -> counter(entityId,0),
typeKey, typeKey,
ctx -> counter(ctx.getEntityId(),0),
new GoodByeCounter())); new GoodByeCounter()));
//#start //#start
@ -148,9 +148,9 @@ public class ShardingCompileOnlyTest {
EntityTypeKey<BlogCommand> blogTypeKey = EntityTypeKey.create(BlogCommand.class, "BlogPost"); EntityTypeKey<BlogCommand> blogTypeKey = EntityTypeKey.create(BlogCommand.class, "BlogPost");
sharding.start( sharding.start(
ShardedEntity.create( Entity.of(
BlogBehavior::behavior,
blogTypeKey, blogTypeKey,
ctx -> BlogBehavior.behavior(ctx.getEntityId()),
new PassivatePost())); new PassivatePost()));
//#persistence //#persistence
} }

View file

@ -8,40 +8,27 @@ import scala.concurrent.Future
import akka.Done import akka.Done
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.typed.Cluster import akka.cluster.typed.Cluster
import akka.cluster.typed.Join import akka.cluster.typed.Join
import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior }
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.persistence.typed.ExpectingReply import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.{ WordSpec, WordSpecLike } import org.scalatest.WordSpecLike
object ClusterShardingPersistenceSpec { object ClusterShardingPersistenceSpec {
val config = ConfigFactory.parseString( val config = ConfigFactory.parseString(
""" """
akka.actor.provider = cluster akka.actor.provider = cluster
akka.remote.artery.enabled = true
akka.remote.netty.tcp.port = 0 akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1 akka.remote.artery.canonical.hostname = 127.0.0.1
akka.cluster.jmx.multi-mbeans-in-same-jvm = on
akka.coordinated-shutdown.terminate-actor-system = off
akka.actor {
serialize-messages = off
allow-java-serialization = off
}
akka.persistence.journal.plugin = "akka.persistence.journal.inmem" akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
""".stripMargin) """)
sealed trait Command sealed trait Command
final case class Add(s: String) extends Command final case class Add(s: String) extends Command
@ -51,9 +38,10 @@ object ClusterShardingPersistenceSpec {
val typeKey = EntityTypeKey[Command]("test") val typeKey = EntityTypeKey[Command]("test")
def persistentActor(entityId: String): Behavior[Command] = def persistentEntity(entityId: String): Behavior[Command] =
PersistentBehavior[Command, String, String]( PersistentEntity[Command, String, String](
persistenceId = typeKey.persistenceIdFrom(entityId), entityTypeKey = typeKey,
entityId = entityId,
emptyState = "", emptyState = "",
commandHandler = (state, cmd) cmd match { commandHandler = (state, cmd) cmd match {
case Add(s) case Add(s)
@ -77,9 +65,9 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
"Typed cluster sharding with persistent actor" must { "Typed cluster sharding with persistent actor" must {
ClusterSharding(system).start(ShardedEntity( ClusterSharding(system).start(Entity(
entityId persistentActor(entityId),
typeKey, typeKey,
ctx persistentEntity(ctx.entityId),
StopPlz StopPlz
)) ))

View file

@ -183,19 +183,19 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
Behaviors.same Behaviors.same
} }
private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(ShardedEntity( private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(Entity(
(shard, _) behavior(shard),
typeKey, typeKey,
ctx behavior(ctx.shard),
StopPlz())) StopPlz()))
private val shardingRef2 = sharding2.start(ShardedEntity( private val shardingRef2 = sharding2.start(Entity(
(shard, _) behavior(shard),
typeKey, typeKey,
ctx behavior(ctx.shard),
StopPlz())) StopPlz()))
private val shardingRef3: ActorRef[IdTestProtocol] = sharding.start(ShardedEntity( private val shardingRef3: ActorRef[IdTestProtocol] = sharding.start(Entity(
(shard, _) behaviorWithId(shard),
typeKey2, typeKey2,
ctx behaviorWithId(ctx.shard),
IdStopPlz()) IdStopPlz())
.withMessageExtractor(ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { .withMessageExtractor(ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
case IdReplyPlz(id, _) id case IdReplyPlz(id, _) id
@ -204,9 +204,9 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
}) })
) )
private val shardingRef4 = sharding2.start(ShardedEntity( private val shardingRef4 = sharding2.start(Entity(
(shard, _) behaviorWithId(shard),
typeKey2, typeKey2,
ctx behaviorWithId(ctx.shard),
IdStopPlz()) IdStopPlz())
.withMessageExtractor( .withMessageExtractor(
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
@ -263,9 +263,9 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
val p = TestProbe[String]() val p = TestProbe[String]()
val typeKey3 = EntityTypeKey[TestProtocol]("passivate-test") val typeKey3 = EntityTypeKey[TestProtocol]("passivate-test")
val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(ShardedEntity( val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(Entity(
(shard, _) behavior(shard, Some(stopProbe.ref)),
typeKey3, typeKey3,
ctx behavior(ctx.shard, Some(stopProbe.ref)),
StopPlz())) StopPlz()))
shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref)) shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref))
@ -281,9 +281,9 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
"fail if starting sharding for already used typeName, but with a different type" in { "fail if starting sharding for already used typeName, but with a different type" in {
// sharding has been already started with EntityTypeKey[TestProtocol]("envelope-shard") // sharding has been already started with EntityTypeKey[TestProtocol]("envelope-shard")
val ex = intercept[Exception] { val ex = intercept[Exception] {
sharding.start(ShardedEntity( sharding.start(Entity(
(shard, _) behaviorWithId(shard),
EntityTypeKey[IdTestProtocol]("envelope-shard"), EntityTypeKey[IdTestProtocol]("envelope-shard"),
ctx behaviorWithId(ctx.shard),
IdStopPlz())) IdStopPlz()))
} }
@ -347,9 +347,9 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
"EntityRef - AskTimeoutException" in { "EntityRef - AskTimeoutException" in {
val ignorantKey = EntityTypeKey[TestProtocol]("ignorant") val ignorantKey = EntityTypeKey[TestProtocol]("ignorant")
sharding.start(ShardedEntity( sharding.start(Entity(
_ Behaviors.ignore[TestProtocol],
ignorantKey, ignorantKey,
_ Behaviors.ignore[TestProtocol],
StopPlz())) StopPlz()))
val ref = sharding.entityRefFor(ignorantKey, "sloppy") val ref = sharding.entityRefFor(ignorantKey, "sloppy")

View file

@ -0,0 +1,100 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.cluster.sharding.typed
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
object HelloWorldPersistentEntityExample {
//#persistent-entity-usage
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.util.Timeout
class HelloWorldService(system: ActorSystem[_]) {
import system.executionContext
// registration at startup
private val sharding = ClusterSharding(system)
sharding.start(Entity(
typeKey = HelloWorld.entityTypeKey,
createBehavior = entityContext HelloWorld.persistentEntity(entityContext.entityId),
stopMessage = HelloWorld.Passivate))
private implicit val askTimeout: Timeout = Timeout(5.seconds)
def greet(worldId: String, whom: String): Future[Int] = {
val entityRef = sharding.entityRefFor(HelloWorld.entityTypeKey, worldId)
val greeting = entityRef ? HelloWorld.Greet(whom)
greeting.map(_.numberOfPeople)
}
}
//#persistent-entity-usage
//#persistent-entity
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.PersistentEntity
import akka.persistence.typed.scaladsl.Effect
object HelloWorld {
// Command
trait Command
final case class Greet(whom: String)(val replyTo: ActorRef[Greeting]) extends Command
case object Passivate extends Command
// Response
final case class Greeting(whom: String, numberOfPeople: Int)
// Event
final case class Greeted(whom: String)
// State
private final case class KnownPeople(names: Set[String]) {
def add(name: String): KnownPeople = copy(names = names + name)
def numberOfPeople: Int = names.size
}
private val commandHandler: (KnownPeople, Command) Effect[Greeted, KnownPeople] = {
(_, cmd)
cmd match {
case cmd: Greet greet(cmd)
case Passivate passivate()
}
}
private def greet(cmd: Greet): Effect[Greeted, KnownPeople] =
Effect.persist(Greeted(cmd.whom))
.thenRun(state cmd.replyTo ! Greeting(cmd.whom, state.numberOfPeople))
private def passivate(): Effect[Greeted, KnownPeople] =
Effect.stop
private val eventHandler: (KnownPeople, Greeted) KnownPeople = {
(state, evt) state.add(evt.whom)
}
val entityTypeKey: EntityTypeKey[Command] =
EntityTypeKey[Command]("HelloWorld")
def persistentEntity(entityId: String): Behavior[Command] = PersistentEntity(
entityTypeKey = entityTypeKey,
entityId = entityId,
emptyState = KnownPeople(Set.empty),
commandHandler,
eventHandler
)
}
//#persistent-entity
}

View file

@ -0,0 +1,57 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.cluster.sharding.typed
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object HelloWorldPersistentEntityExampleSpec {
val config = ConfigFactory.parseString(
"""
akka.actor.provider = cluster
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
""")
}
class HelloWorldPersistentEntityExampleSpec extends ScalaTestWithActorTestKit(HelloWorldPersistentEntityExampleSpec.config) with WordSpecLike {
import HelloWorldPersistentEntityExample.HelloWorld
import HelloWorldPersistentEntityExample.HelloWorld._
val sharding = ClusterSharding(system)
override def beforeAll(): Unit = {
super.beforeAll()
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
sharding.start(Entity(
HelloWorld.entityTypeKey,
ctx HelloWorld.persistentEntity(ctx.entityId),
HelloWorld.Passivate
))
}
"HelloWorld example" must {
"sayHello" in {
val probe = createTestProbe[Greeting]()
val ref = ClusterSharding(system).entityRefFor(HelloWorld.entityTypeKey, "1")
ref ! Greet("Alice")(probe.ref)
probe.expectMessage(Greeting("Alice", 1))
ref ! Greet("Bob")(probe.ref)
probe.expectMessage(Greeting("Bob", 2))
}
}
}

View file

@ -6,9 +6,9 @@ package docs.akka.cluster.sharding.typed
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.scaladsl.ShardedEntity import akka.cluster.sharding.typed.scaladsl.Entity
import docs.akka.persistence.typed.BlogPostExample import docs.akka.persistence.typed.BlogPostExample
import docs.akka.persistence.typed.BlogPostExample.{ BlogCommand, PassivatePost } import docs.akka.persistence.typed.BlogPostExample.{ BlogCommand, PassivatePost }
@ -17,7 +17,6 @@ object ShardingCompileOnlySpec {
val system = ActorSystem(Behaviors.empty, "Sharding") val system = ActorSystem(Behaviors.empty, "Sharding")
//#sharding-extension //#sharding-extension
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.sharding.typed.ShardingEnvelope import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
@ -50,9 +49,9 @@ object ShardingCompileOnlySpec {
//#start //#start
val TypeKey = EntityTypeKey[CounterCommand]("Counter") val TypeKey = EntityTypeKey[CounterCommand]("Counter")
val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.start(ShardedEntity( val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.start(Entity(
create = entityId counter(entityId, 0),
typeKey = TypeKey, typeKey = TypeKey,
createBehavior = ctx counter(ctx.entityId, 0),
stopMessage = GoodByeCounter)) stopMessage = GoodByeCounter))
//#start //#start
@ -69,9 +68,9 @@ object ShardingCompileOnlySpec {
//#persistence //#persistence
val BlogTypeKey = EntityTypeKey[BlogCommand]("BlogPost") val BlogTypeKey = EntityTypeKey[BlogCommand]("BlogPost")
ClusterSharding(system).start(ShardedEntity( ClusterSharding(system).start(Entity(
create = entityId behavior(entityId),
typeKey = BlogTypeKey, typeKey = BlogTypeKey,
createBehavior = ctx behavior(ctx.entityId),
stopMessage = PassivatePost)) stopMessage = PassivatePost))
//#persistence //#persistence
@ -103,9 +102,9 @@ object ShardingCompileOnlySpec {
} }
} }
sharding.start(ShardedEntity( sharding.start(Entity(
create = (shard, entityId) counter2(shard, entityId),
typeKey = TypeKey, typeKey = TypeKey,
createBehavior = ctx counter2(ctx.shard, ctx.entityId),
stopMessage = GoodByeCounter)) stopMessage = GoodByeCounter))
//#counter-passivate //#counter-passivate

View file

@ -64,24 +64,36 @@ Java
When using sharding entities can be moved to different nodes in the cluster. Persistence can be used to recover the state of When using sharding entities can be moved to different nodes in the cluster. Persistence can be used to recover the state of
an actor after it has moved. an actor after it has moved.
Taking the larger example from the @ref:[persistence documentation](persistence.md#larger-example) and making it into Akka Persistence is based on the single-writer principle, for a particular `persitenceId` only one persistent actor
a sharded entity is the same as for a non persistent behavior. The behavior: instance should be active. If multiple instances were to persist events at the same time, the events would be
interleaved and might not be interpreted correctly on replay. Cluster sharding is typically used together with
persistence to ensure that there is only one active entity for each `persistenceId` (`entityId`).
Here is an example of a persistent actor that is used as a sharded entity:
Scala Scala
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #behavior } : @@snip [HelloWorldPersistentEntityExample.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala) { #persistent-entity }
Java Java
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #behavior } : @@snip [HelloWorldPersistentEntityExample.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java) { #persistent-entity-import #persistent-entity }
To create the entity: Note that `PersistentEntity` is used in this example. Any `Behavior` can be used as a sharded entity actor,
but the combination of sharding and persistent actors is very common and therefore the `PersistentEntity`
@scala[factory]@java[class] is provided as convenience. It selects the `persistenceId` automatically from
the `EntityTypeKey` and `entityId` @java[constructor] parameters by using `EntityTypeKey.persistenceIdFrom`.
To initialize and use the entity:
Scala Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #persistence } : @@snip [HelloWorldPersistentEntityExample.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala) { #persistent-entity-usage }
Java Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #persistence } : @@snip [HelloWorldPersistentEntityExample.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java) { #persistent-entity-usage-import #persistent-entity-usage }
Sending messages to persistent entities is the same as if the entity wasn't persistent. The only difference is
when an entity is moved the state will be restored. In the above example @ref:[ask](interaction-patterns.md#outside-ask)
is used but `tell` or any of the other @ref:[Interaction Patterns](interaction-patterns.md) can be used.
Sending messages to entities is the same as the example above. The only difference is when an entity is moved the state will be restored.
See @ref:[persistence](persistence.md) for more details. See @ref:[persistence](persistence.md) for more details.
## Passivation ## Passivation
@ -92,7 +104,7 @@ the entity actors for example by defining receive timeout (`context.setReceiveTi
If a message is already enqueued to the entity when it stops itself the enqueued message If a message is already enqueued to the entity when it stops itself the enqueued message
in the mailbox will be dropped. To support graceful passivation without losing such in the mailbox will be dropped. To support graceful passivation without losing such
messages the entity actor can send `ClusterSharding.Passivate` to to the messages the entity actor can send `ClusterSharding.Passivate` to to the
@scala:[`ActorRef[ShardCommand]`]@java:[`ActorRef<ShardCommand>`] that was passed in to @scala[`ActorRef[ShardCommand]`]@java[`ActorRef<ShardCommand>`] that was passed in to
the factory method when creating the entity. The specified `handOffStopMessage` message the factory method when creating the entity. The specified `handOffStopMessage` message
will be sent back to the entity, which is then supposed to stop itself. Incoming messages will be sent back to the entity, which is then supposed to stop itself. Incoming messages
will be buffered by the `Shard` between reception of `Passivate` and termination of the will be buffered by the `Shard` between reception of `Passivate` and termination of the

View file

@ -169,8 +169,8 @@ The response adapting function is running in the receiving actor and can safely
* When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact * When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact
* Finding a good value for the timeout, especially when `ask` is triggers chained `ask`s in the receiving actor. You want a short timeout to be responsive and answer back to the requester, but at the same time you do not want to have many false positives * Finding a good value for the timeout, especially when `ask` is triggers chained `ask`s in the receiving actor. You want a short timeout to be responsive and answer back to the requester, but at the same time you do not want to have many false positives
<a id="outside-ask"></a>
## Request-Response with ask from outside the ActorSystem ## Request-Response with ask from outside an Actor
Some times you need to interact with actors from outside of the actor system, this can be done with fire-and-forget as described above or through another version of `ask` that returns a @scala[`Future[Response]`]@java[`CompletionStage<Response>`] that is either completed with a successful response or failed with a `TimeoutException` if there was no response within the specified timeout. Some times you need to interact with actors from outside of the actor system, this can be done with fire-and-forget as described above or through another version of `ask` that returns a @scala[`Future[Response]`]@java[`CompletionStage<Response>`] that is either completed with a successful response or failed with a `TimeoutException` if there was no response within the specified timeout.

View file

@ -130,8 +130,19 @@ Scala
Java Java
: @@snip [PersistentActorCompileOnyTest.java](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #behavior } : @@snip [PersistentActorCompileOnyTest.java](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #behavior }
The `PersistentBehavior` can then be run as with any plain typed actor as described in [typed actors documentation](actors-typed.md). ## Cluster Sharding and persistence
In a use case where the number of persistent actors needed are higher than what would fit in the memory of one node or
where resilience is important so that if a node crashes the persistent actors are quickly started on a new node and can
resume operations @ref:[Cluster Sharding](cluster-sharding.md) is an excellent fit to spread persistent actors over a
cluster and address them by id.
The `PersistentBehavior` can then be run as with any plain typed actor as described in [actors documentation](actors-typed.md),
but since Akka Persistence is based on the single-writer principle the persistent actors are typically used together
with Cluster Sharding. For a particular `persistenceId` only one persistent actor instance should be active at one time.
If multiple instances were to persist events at the same time, the events would be interleaved and might not be
interpreted correctly on replay. Cluster Sharding ensures that there is only one active entity for each id. The
@ref:[Cluster Sharding example](cluster-sharding.md#persistence-example) illustrates this common combination.
## Accessing the ActorContext ## Accessing the ActorContext

View file

@ -14,6 +14,11 @@ import akka.persistence.typed.ExpectingReply
object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing] object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
/**
* Factory methods for creating [[Effect]] directives.
*
* Not for user extension
*/
@DoNotInherit sealed class EffectFactories[Command, Event, State] { @DoNotInherit sealed class EffectFactories[Command, Event, State] {
/** /**
* Persist a single event * Persist a single event

View file

@ -7,7 +7,6 @@ package akka.persistence.typed.scaladsl
import akka.Done import akka.Done
import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.Behavior.DeferredBehavior
import akka.annotation.InternalApi
import akka.persistence._ import akka.persistence._
import akka.persistence.typed.EventAdapter import akka.persistence.typed.EventAdapter
import akka.persistence.typed.internal._ import akka.persistence.typed.internal._
@ -85,9 +84,14 @@ object PersistentBehavior {
} }
/** /**
* Not intended for user extension. * Further customization of the `PersistentBehavior` can be done with the methods defined here.
*
* Not for user extension
*/ */
@DoNotInherit trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command] { @DoNotInherit trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command] {
def persistenceId: PersistenceId
/** /**
* The `callback` function is called to notify the actor that the recovery process * The `callback` function is called to notify the actor that the recovery process
* is finished. * is finished.