Remove EventSourcedEntity, #27724

Move utils to construct PersistenceId

* Move from EntityTypeKey to PersistenceId
* Thereby no persistence dependencies in sharding
* Reference documentation PersistenceId and how to use with Sharding
* Add EntityTypeKey to EntityContext to make it "complete"
* One consequence of adding EntityTypeKey to EntityContext is that
  now requires additional message type parameter and then type inference
  for `init` (Entity) breaks down. Solved by using two parameter lists in
  Entity.apply, which is pretty nice anyway since the second parameter is
  a function.
* as bonus the dependency can be removed
This commit is contained in:
Patrik Nordwall 2019-09-18 07:28:07 +02:00
parent e74831d78b
commit 1b3a75b3f8
36 changed files with 479 additions and 513 deletions

View file

@ -39,7 +39,6 @@ import akka.event.LoggingAdapter
import akka.japi.function.{ Function => JFunction }
import akka.pattern.AskTimeoutException
import akka.pattern.PromiseActorRef
import akka.persistence.typed.PersistenceId
import akka.util.ByteString
import akka.util.Timeout
@ -79,44 +78,10 @@ import akka.util.Timeout
/**
* INTERNAL API
*/
@InternalApi private[akka] object EntityTypeKeyImpl {
/**
* Default separator character used for concatenating EntityTypeKey with entityId to construct unique persistenceId.
* This must be same as in Lagom's `scaladsl.PersistentEntity`, for compatibility. No separator is used
* in Lagom's `javadsl.PersistentEntity` so for compatibility with that the `""` separator must be defined
* `withEntityIdSeparator`.
*/
val EntityIdSeparator = "|"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class EntityTypeKeyImpl[T](
name: String,
messageClassName: String,
entityIdSeparator: String = EntityTypeKeyImpl.EntityIdSeparator)
@InternalApi private[akka] final case class EntityTypeKeyImpl[T](name: String, messageClassName: String)
extends javadsl.EntityTypeKey[T]
with scaladsl.EntityTypeKey[T] {
if (!entityIdSeparator.isEmpty && name.contains(entityIdSeparator))
throw new IllegalArgumentException(
s"EntityTypeKey.name [$name] contains [$entityIdSeparator] which is " +
"a reserved character")
override def persistenceIdFrom(entityId: String): PersistenceId = {
if (!entityIdSeparator.isEmpty && entityId.contains(entityIdSeparator))
throw new IllegalArgumentException(
s"entityId [$entityId] contains [$entityIdSeparator] which is " +
"a reserved character")
PersistenceId(name + entityIdSeparator + entityId)
}
override def withEntityIdSeparator(separator: String): EntityTypeKeyImpl[T] =
EntityTypeKeyImpl[T](name, messageClassName, separator)
override def toString: String = s"EntityTypeKey[$messageClassName]($name)"
}
@ -169,10 +134,8 @@ import akka.util.Timeout
import scala.compat.java8.OptionConverters._
init(
new scaladsl.Entity(
createBehavior = (ctx: EntityContext) =>
Behaviors.setup[M] { actorContext =>
entity.createBehavior(new javadsl.EntityContext[M](ctx.entityId, ctx.shard, actorContext.asJava))
},
createBehavior = (ctx: EntityContext[M]) =>
entity.createBehavior(new javadsl.EntityContext[M](entity.typeKey, ctx.entityId, ctx.shard)),
typeKey = entity.typeKey.asScala,
stopMessage = entity.stopMessage.asScala,
entityProps = entity.entityProps,
@ -182,7 +145,7 @@ import akka.util.Timeout
}
private def internalInit[M, E](
behavior: EntityContext => Behavior[M],
behavior: EntityContext[M] => Behavior[M],
entityProps: Props,
typeKey: scaladsl.EntityTypeKey[M],
stopMessage: Option[M],
@ -229,7 +192,7 @@ import akka.util.Timeout
}
val classicEntityPropsFactory: String => akka.actor.Props = { entityId =>
val behv = behavior(new EntityContext(entityId, shardCommandDelegator))
val behv = behavior(new EntityContext(typeKey, entityId, shardCommandDelegator))
PropsAdapter(poisonPillInterceptor(behv), entityProps)
}
classicSharding.internalStart(

View file

@ -15,13 +15,11 @@ import akka.actor.typed.Behavior
import akka.actor.typed.RecipientRef
import akka.actor.typed.Props
import akka.actor.typed.internal.InternalRecipientRef
import akka.actor.typed.javadsl.ActorContext
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.japi.function.{ Function => JFunction }
import akka.persistence.typed.PersistenceId
import com.github.ghik.silencer.silent
@FunctionalInterface
@ -207,9 +205,6 @@ object Entity {
* Defines how the entity should be created. Used in [[ClusterSharding#init]]. More optional
* settings can be defined using the `with` methods of the returned [[Entity]].
*
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent actors
* is very common and therefore the [[Entity.ofEventSourcedEntity]] is provided as convenience.
*
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId)
* @tparam M The type of message the entity accepts
@ -227,71 +222,6 @@ object Entity {
Optional.empty())
}
/**
* Defines how the [[EventSourcedEntity]] should be created. Used in [[ClusterSharding#init]]. Any [[Behavior]] can
* be used as a sharded entity actor, but the combination of sharding and persistent actors is very common
* and therefore this factory is provided as convenience.
*
* More optional settings can be defined using the `with` methods of the returned [[Entity]].
*
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param createPersistentEntity Create the `PersistentEntity` for an entity given a [[EntityContext]] (includes entityId)
* @tparam Command The type of message the entity accepts
*/
def ofEventSourcedEntity[Command, Event, State](
typeKey: EntityTypeKey[Command],
createPersistentEntity: JFunction[EntityContext[Command], EventSourcedEntity[Command, Event, State]])
: Entity[Command, ShardingEnvelope[Command]] = {
of(
typeKey,
new JFunction[EntityContext[Command], Behavior[Command]] {
override def apply(ctx: EntityContext[Command]): Behavior[Command] = {
val persistentEntity = createPersistentEntity(ctx)
if (persistentEntity.entityTypeKey != typeKey)
throw new IllegalArgumentException(
s"The [${persistentEntity.entityTypeKey}] of the PersistentEntity " +
s" [${persistentEntity.getClass.getName}] doesn't match expected $typeKey.")
persistentEntity
}
})
}
/**
* Defines how the [[EventSourcedEntityWithEnforcedReplies]] should be created. Used in [[ClusterSharding#init]]. Any [[Behavior]] can
* be used as a sharded entity actor, but the combination of sharding and persistent actors is very common
* and therefore this factory is provided as convenience.
*
* A [[EventSourcedEntityWithEnforcedReplies]] enforces that replies to commands are not forgotten.
* There will be compilation errors if the returned effect isn't a [[akka.persistence.typed.javadsl.ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[akka.persistence.typed.javadsl.Effect.thenReply]], or [[akka.persistence.typed.javadsl.Effect.thenNoReply]].
*
* More optional settings can be defined using the `with` methods of the returned [[Entity]].
*
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param createPersistentEntity Create the `PersistentEntity` for an entity given a [[EntityContext]] (includes entityId)
* @tparam Command The type of message the entity accepts
*/
def ofEventSourcedEntityWithEnforcedReplies[Command, Event, State](
typeKey: EntityTypeKey[Command],
createPersistentEntity: JFunction[
EntityContext[Command],
EventSourcedEntityWithEnforcedReplies[Command, Event, State]]): Entity[Command, ShardingEnvelope[Command]] = {
of(
typeKey,
new JFunction[EntityContext[Command], Behavior[Command]] {
override def apply(ctx: EntityContext[Command]): Behavior[Command] = {
val persistentEntity = createPersistentEntity(ctx)
if (persistentEntity.entityTypeKey != typeKey)
throw new IllegalArgumentException(
s"The [${persistentEntity.entityTypeKey}] of the PersistentEntity " +
s" [${persistentEntity.getClass.getName}] doesn't match expected $typeKey.")
persistentEntity
}
})
}
}
/**
@ -365,18 +295,29 @@ final class Entity[M, E] private (
}
/**
* Parameter to [[Entity.of]]
* Parameter to `createBehavior` function in [[Entity.of]].
*
* Cluster Sharding is often used together with [[akka.persistence.typed.javadsl.EventSourcedBehavior]]
* for the entities. See more considerations in [[akka.persistence.typed.PersistenceId]].
* The `PersistenceId` of the `EventSourcedBehavior` can typically be constructed with:
* {{{
* PersistenceId.of(entityContext.getEntityTypeKey().name(), entityContext.getEntityId())
* }}}
*
* @param entityTypeKey the key of the entity type
* @param entityId the business domain identifier of the entity
*/
final class EntityContext[M](
entityTypeKey: EntityTypeKey[M],
entityId: String,
shard: ActorRef[ClusterSharding.ShardCommand],
actorContext: ActorContext[M]) {
shard: ActorRef[ClusterSharding.ShardCommand]) {
def getEntityTypeKey: EntityTypeKey[M] = entityTypeKey
def getEntityId: String = entityId
def getShard: ActorRef[ClusterSharding.ShardCommand] = shard
def getActorContext: ActorContext[M] = actorContext
}
@silent // for unused msgClass to make class type explicit in the Java API. Not using @unused as the user is likely to see it
@ -408,26 +349,6 @@ object StartEntity {
*/
@InternalApi private[akka] def asScala: scaladsl.EntityTypeKey[T] = scaladslSelf
/**
* Constructs a [[PersistenceId]] from this `EntityTypeKey` and the given `entityId` by
* concatenating them with `|` separator.
*
* The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator in [[EntityTypeKey.withEntityIdSeparator]].
*/
def persistenceIdFrom(entityId: String): PersistenceId
/**
* Specify a custom separator for compatibility with old naming conventions. The separator is used between the
* `EntityTypeKey` and the `entityId` when constructing a `persistenceId` with [[EntityTypeKey.persistenceIdFrom]].
*
* The default `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator here.
*/
def withEntityIdSeparator(separator: String): EntityTypeKey[T]
}
object EntityTypeKey {

View file

@ -1,81 +0,0 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed.javadsl
import java.util.Optional
import akka.actor.typed.BackoffSupervisorStrategy
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.javadsl.{ EventSourcedBehavior, EventSourcedBehaviorWithEnforcedReplies }
/**
* Any [[akka.actor.typed.Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent
* actors is very common and therefore this `PersistentEntity` class is provided as convenience.
*
* It is a [[EventSourcedBehavior]] and is implemented in the same way. It selects the `persistenceId`
* automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using
* [[EntityTypeKey.persistenceIdFrom]].
*/
abstract class EventSourcedEntity[Command, Event, State] private (
val entityTypeKey: EntityTypeKey[Command],
val entityId: String,
persistenceId: PersistenceId,
onPersistFailure: Optional[BackoffSupervisorStrategy])
extends EventSourcedBehavior[Command, Event, State](persistenceId, onPersistFailure) {
def this(entityTypeKey: EntityTypeKey[Command], entityId: String) = {
this(
entityTypeKey,
entityId,
persistenceId = entityTypeKey.persistenceIdFrom(entityId),
Optional.empty[BackoffSupervisorStrategy])
}
def this(entityTypeKey: EntityTypeKey[Command], entityId: String, onPersistFailure: BackoffSupervisorStrategy) = {
this(
entityTypeKey,
entityId,
persistenceId = entityTypeKey.persistenceIdFrom(entityId),
Optional.ofNullable(onPersistFailure))
}
}
/**
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent
* actors is very common and therefore this `PersistentEntity` class is provided as convenience.
*
* A [[EventSourcedEntityWithEnforcedReplies]] enforces that replies to commands are not forgotten.
* There will be compilation errors if the returned effect isn't a [[akka.persistence.typed.javadsl.ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[akka.persistence.typed.javadsl.Effect.thenReply]], or [[akka.persistence.typed.javadsl.Effect.thenNoReply]].
*
* It is a [[EventSourcedBehavior]] and is implemented in the same way. It selects the `persistenceId`
* automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using
* [[EntityTypeKey.persistenceIdFrom]].
*/
abstract class EventSourcedEntityWithEnforcedReplies[Command, Event, State] private (
val entityTypeKey: EntityTypeKey[Command],
val entityId: String,
persistenceId: PersistenceId,
onPersistFailure: Optional[BackoffSupervisorStrategy])
extends EventSourcedBehaviorWithEnforcedReplies[Command, Event, State](persistenceId, onPersistFailure) {
def this(entityTypeKey: EntityTypeKey[Command], entityId: String) = {
this(
entityTypeKey,
entityId,
persistenceId = entityTypeKey.persistenceIdFrom(entityId),
Optional.empty[BackoffSupervisorStrategy])
}
def this(entityTypeKey: EntityTypeKey[Command], entityId: String, onPersistFailure: BackoffSupervisorStrategy) = {
this(
entityTypeKey,
entityId,
persistenceId = entityTypeKey.persistenceIdFrom(entityId),
Optional.ofNullable(onPersistFailure))
}
}

View file

@ -24,7 +24,6 @@ import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.typed.internal.ClusterShardingImpl
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.cluster.sharding.ShardRegion.{ StartEntity => ClassicStartEntity }
import akka.persistence.typed.PersistenceId
object ClusterSharding extends ExtensionId[ClusterSharding] {
@ -212,17 +211,12 @@ object Entity {
* Defines how the entity should be created. Used in [[ClusterSharding#init]]. More optional
* settings can be defined using the `with` methods of the returned [[Entity]].
*
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent actors
* is very common and therefore [[EventSourcedEntity]] is provided as a convenience for creating such
* `EventSourcedBehavior`.
*
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId)
* @tparam M The type of message the entity accepts
*/
def apply[M](
typeKey: EntityTypeKey[M],
createBehavior: EntityContext => Behavior[M]): Entity[M, ShardingEnvelope[M]] =
def apply[M](typeKey: EntityTypeKey[M])(
createBehavior: EntityContext[M] => Behavior[M]): Entity[M, ShardingEnvelope[M]] =
new Entity(createBehavior, typeKey, None, Props.empty, None, None, None)
}
@ -230,7 +224,7 @@ object Entity {
* Defines how the entity should be created. Used in [[ClusterSharding#init]].
*/
final class Entity[M, E] private[akka] (
val createBehavior: EntityContext => Behavior[M],
val createBehavior: EntityContext[M] => Behavior[M],
val typeKey: EntityTypeKey[M],
val stopMessage: Option[M],
val entityProps: Props,
@ -278,7 +272,7 @@ final class Entity[M, E] private[akka] (
copy(allocationStrategy = Option(newAllocationStrategy))
private def copy(
createBehavior: EntityContext => Behavior[M] = createBehavior,
createBehavior: EntityContext[M] => Behavior[M] = createBehavior,
typeKey: EntityTypeKey[M] = typeKey,
stopMessage: Option[M] = stopMessage,
entityProps: Props = entityProps,
@ -290,9 +284,22 @@ final class Entity[M, E] private[akka] (
}
/**
* Parameter to [[Entity.apply]]
* Parameter to `createBehavior` function in [[Entity.apply]].
*
* Cluster Sharding is often used together with [[akka.persistence.typed.scaladsl.EventSourcedBehavior]]
* for the entities. See more considerations in [[akka.persistence.typed.PersistenceId]].
* The `PersistenceId` of the `EventSourcedBehavior` can typically be constructed with:
* {{{
* PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId)
* }}}
*
* @param entityTypeKey the key of the entity type
* @param entityId the business domain identifier of the entity
*/
final class EntityContext(val entityId: String, val shard: ActorRef[ClusterSharding.ShardCommand])
final class EntityContext[M](
val entityTypeKey: EntityTypeKey[M],
val entityId: String,
val shard: ActorRef[ClusterSharding.ShardCommand])
/** Allows starting a specific Sharded Entity by its entity identifier */
object StartEntity {
@ -319,25 +326,6 @@ object StartEntity {
*/
def name: String
/**
* Constructs a [[PersistenceId]] from this `EntityTypeKey` and the given `entityId` by
* concatenating them with `|` separator.
*
* The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator in [[EntityTypeKey.withEntityIdSeparator]].
*/
def persistenceIdFrom(entityId: String): PersistenceId
/**
* Specify a custom separator for compatibility with old naming conventions. The separator is used between the
* `EntityTypeKey` and the `entityId` when constructing a `persistenceId` with [[EntityTypeKey.persistenceIdFrom]].
*
* The default `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator here.
*/
def withEntityIdSeparator(separator: String): EntityTypeKey[T]
}
object EntityTypeKey {

View file

@ -1,53 +0,0 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed.scaladsl
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplyEffect }
object EventSourcedEntity {
/**
* Create a `Behavior` for a persistent actor that is used with Cluster Sharding.
*
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent
* actors is very common and therefore this `PersistentEntity` is provided as convenience.
*
* It is a [[EventSourcedBehavior]] and is implemented in the same way. It selects the `persistenceId`
* automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using
* [[EntityTypeKey.persistenceIdFrom]].
*/
def apply[Command, Event, State](
entityTypeKey: EntityTypeKey[Command],
entityId: String,
emptyState: State,
commandHandler: (State, Command) => Effect[Event, State],
eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] =
EventSourcedBehavior(entityTypeKey.persistenceIdFrom(entityId), emptyState, commandHandler, eventHandler)
/**
* Create a `Behavior` for a persistent actor that is used with Cluster Sharding
* and enforces that replies to commands are not forgotten.
*
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]].
*
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent
* actors is very common and therefore this `PersistentEntity` is provided as convenience.
*
* It is a [[EventSourcedBehavior]] and is implemented in the same way. It selects the `persistenceId`
* automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using
* [[EntityTypeKey.persistenceIdFrom]].
*/
def withEnforcedReplies[Command, Event, State](
entityTypeKey: EntityTypeKey[Command],
entityId: String,
emptyState: State,
commandHandler: (State, Command) => ReplyEffect[Event, State],
eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] =
EventSourcedBehavior.withEnforcedReplies(
entityTypeKey.persistenceIdFrom(entityId),
emptyState,
commandHandler,
eventHandler)
}

View file

@ -66,7 +66,7 @@ abstract class MultiDcClusterShardingSpec
"init sharding" in {
val sharding = ClusterSharding(typedSystem)
val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.init(Entity(typeKey, _ => multiDcPinger))
val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.init(Entity(typeKey)(_ => multiDcPinger))
val probe = TestProbe[Pong]
shardRegion ! ShardingEnvelope(entityId, Ping(probe.ref))
probe.expectMessage(max = 15.seconds, Pong(cluster.selfMember.dataCenter))
@ -93,7 +93,7 @@ abstract class MultiDcClusterShardingSpec
"be able to message cross dc via proxy" in {
runOn(first, second) {
val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).init(
Entity(typeKey, _ => multiDcPinger).withSettings(ClusterShardingSettings(typedSystem).withDataCenter("dc2")))
Entity(typeKey)(_ => multiDcPinger).withSettings(ClusterShardingSettings(typedSystem).withDataCenter("dc2")))
val probe = TestProbe[Pong]
proxy ! ShardingEnvelope(entityId, Ping(probe.ref))
probe.expectMessage(remainingOrDefault, Pong("dc2"))

View file

@ -11,10 +11,11 @@ import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Join;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.Effect;
import akka.persistence.typed.javadsl.EventHandler;
import akka.util.Timeout;
import akka.persistence.typed.javadsl.EventSourcedBehavior;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
@ -67,13 +68,15 @@ public class ClusterShardingPersistenceTest extends JUnitSuite {
}
}
static class TestPersistentEntity extends EventSourcedEntity<Command, String, String> {
static class TestPersistentEntity extends EventSourcedBehavior<Command, String, String> {
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "HelloWorld");
private final String entityId;
public TestPersistentEntity(String entityId) {
super(ENTITY_TYPE_KEY, entityId);
public TestPersistentEntity(String entityId, PersistenceId persistenceId) {
super(persistenceId);
this.entityId = entityId;
}
@Override
@ -100,7 +103,7 @@ public class ClusterShardingPersistenceTest extends JUnitSuite {
}
private Effect<String, String> getState(String state, Get cmd) {
cmd.replyTo.tell(entityId() + ":" + state);
cmd.replyTo.tell(entityId + ":" + state);
return Effect().none();
}
@ -126,9 +129,13 @@ public class ClusterShardingPersistenceTest extends JUnitSuite {
ClusterSharding sharding = ClusterSharding.get(testKit.system());
sharding.init(
Entity.ofEventSourcedEntity(
Entity.of(
TestPersistentEntity.ENTITY_TYPE_KEY,
entityContext -> new TestPersistentEntity(entityContext.getEntityId())));
entityContext ->
new TestPersistentEntity(
entityContext.getEntityId(),
PersistenceId.of(
entityContext.getEntityTypeKey().name(), entityContext.getEntityId()))));
_sharding = sharding;
}

View file

@ -7,8 +7,10 @@ package akka.cluster.sharding.typed.javadsl;
import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Join;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.*;
import org.junit.ClassRule;
import org.junit.Rule;
@ -32,13 +34,17 @@ public class ShardingEventSourcedEntityWithEnforcedRepliesCompileOnlyTest {
}
static class TestPersistentEntityWithEnforcedReplies
extends EventSourcedEntityWithEnforcedReplies<Command, String, String> {
extends EventSourcedBehaviorWithEnforcedReplies<Command, String, String> {
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "HelloWorld");
public TestPersistentEntityWithEnforcedReplies(String entityId) {
super(ENTITY_TYPE_KEY, entityId);
public static Behavior<Command> create(String entityId, PersistenceId persistenceId) {
return new TestPersistentEntityWithEnforcedReplies(entityId, persistenceId);
}
private TestPersistentEntityWithEnforcedReplies(String entityId, PersistenceId persistenceId) {
super(persistenceId);
}
@Override
@ -78,9 +84,12 @@ public class ShardingEventSourcedEntityWithEnforcedRepliesCompileOnlyTest {
ClusterSharding sharding = ClusterSharding.get(testKit.system());
sharding.init(
Entity.ofEventSourcedEntityWithEnforcedReplies(
Entity.of(
TestPersistentEntityWithEnforcedReplies.ENTITY_TYPE_KEY,
entityContext ->
new TestPersistentEntityWithEnforcedReplies(entityContext.getEntityId())));
TestPersistentEntityWithEnforcedReplies.create(
entityContext.getEntityId(),
PersistenceId.of(
entityContext.getEntityTypeKey().name(), entityContext.getEntityId()))));
}
}

View file

@ -13,6 +13,7 @@ import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Join;
import akka.persistence.typed.PersistenceId;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
@ -54,8 +55,13 @@ public class AccountExampleTest extends JUnitSuite {
ClusterSharding sharding = ClusterSharding.get(testKit.system());
sharding.init(
Entity.ofEventSourcedEntityWithEnforcedReplies(
AccountEntity.ENTITY_TYPE_KEY, ctx -> AccountEntity.create(ctx.getEntityId())));
Entity.of(
AccountEntity.ENTITY_TYPE_KEY,
entityContext ->
AccountEntity.create(
entityContext.getEntityId(),
PersistenceId.of(
entityContext.getEntityTypeKey().name(), entityContext.getEntityId()))));
_sharding = sharding;
}
return _sharding;

View file

@ -6,11 +6,12 @@ package jdocs.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandlerWithReply;
import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.EventHandlerBuilder;
import akka.persistence.typed.javadsl.EventSourcedBehaviorWithEnforcedReplies;
import akka.persistence.typed.javadsl.ReplyEffect;
import akka.serialization.jackson.CborSerializable;
import com.fasterxml.jackson.annotation.JsonCreator;
@ -21,14 +22,14 @@ import java.math.BigDecimal;
* Bank account example illustrating: - different state classes representing the lifecycle of the
* account - event handlers that delegate to methods in the state classes - command handlers that
* delegate to methods in the class - replies of various types, using
* EventSourcedEntityWithEnforcedReplies
* EventSourcedBehaviorWithEnforcedReplies
*/
public interface AccountExampleWithEventHandlersInState {
// #account-entity
// #withEnforcedReplies
public class AccountEntity
extends EventSourcedEntityWithEnforcedReplies<
extends EventSourcedBehaviorWithEnforcedReplies<
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> {
// #withEnforcedReplies
@ -179,12 +180,15 @@ public interface AccountExampleWithEventHandlersInState {
public static class ClosedAccount implements Account {}
public static AccountEntity create(String accountNumber) {
return new AccountEntity(accountNumber);
public static AccountEntity create(String accountNumber, PersistenceId persistenceId) {
return new AccountEntity(accountNumber, persistenceId);
}
private AccountEntity(String accountNumber) {
super(ENTITY_TYPE_KEY, accountNumber);
private final String accountNumber;
private AccountEntity(String accountNumber, PersistenceId persistenceId) {
super(persistenceId);
this.accountNumber = accountNumber;
}
@Override

View file

@ -6,11 +6,12 @@ package jdocs.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandlerWithReply;
import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.EventHandlerBuilder;
import akka.persistence.typed.javadsl.EventSourcedBehaviorWithEnforcedReplies;
import akka.persistence.typed.javadsl.ReplyEffect;
import akka.serialization.jackson.CborSerializable;
import com.fasterxml.jackson.annotation.JsonCreator;
@ -21,13 +22,13 @@ import java.math.BigDecimal;
* Bank account example illustrating: - different state classes representing the lifecycle of the
* account - mutable state - event handlers that delegate to methods in the state classes - command
* handlers that delegate to methods in the EventSourcedBehavior class - replies of various types,
* using EventSourcedEntityWithEnforcedReplies
* using EventSourcedBehaviorWithEnforcedReplies
*/
public interface AccountExampleWithMutableState {
// #account-entity
public class AccountEntity
extends EventSourcedEntityWithEnforcedReplies<
extends EventSourcedBehaviorWithEnforcedReplies<
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> {
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
@ -172,12 +173,15 @@ public interface AccountExampleWithMutableState {
public static class ClosedAccount implements Account {}
public static AccountEntity create(String accountNumber) {
return new AccountEntity(accountNumber);
public static AccountEntity create(String accountNumber, PersistenceId persistenceId) {
return new AccountEntity(accountNumber, persistenceId);
}
private AccountEntity(String accountNumber) {
super(ENTITY_TYPE_KEY, accountNumber);
private final String accountNumber;
private AccountEntity(String accountNumber, PersistenceId persistenceId) {
super(persistenceId);
this.accountNumber = accountNumber;
}
@Override

View file

@ -6,11 +6,12 @@ package jdocs.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandlerWithReply;
import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.EventHandlerBuilder;
import akka.persistence.typed.javadsl.EventSourcedBehaviorWithEnforcedReplies;
import akka.persistence.typed.javadsl.ReplyEffect;
import akka.serialization.jackson.CborSerializable;
import com.fasterxml.jackson.annotation.JsonCreator;
@ -21,13 +22,13 @@ import java.math.BigDecimal;
* Bank account example illustrating: - different state classes representing the lifecycle of the
* account - null as emptyState - event handlers that delegate to methods in the state classes -
* command handlers that delegate to methods in the EventSourcedBehavior class - replies of various
* types, using EventSourcedEntityWithEnforcedReplies
* types, using EventSourcedBehaviorWithEnforcedReplies
*/
public interface AccountExampleWithNullState {
// #account-entity
public class AccountEntity
extends EventSourcedEntityWithEnforcedReplies<
extends EventSourcedBehaviorWithEnforcedReplies<
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> {
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
@ -171,12 +172,15 @@ public interface AccountExampleWithNullState {
public static class ClosedAccount implements Account {}
public static AccountEntity create(String accountNumber) {
return new AccountEntity(accountNumber);
public static AccountEntity create(String accountNumber, PersistenceId persistenceId) {
return new AccountEntity(accountNumber, persistenceId);
}
private AccountEntity(String accountNumber) {
super(ENTITY_TYPE_KEY, accountNumber);
private final String accountNumber;
private AccountEntity(String accountNumber, PersistenceId persistenceId) {
super(persistenceId);
this.accountNumber = accountNumber;
}
@Override

View file

@ -12,6 +12,7 @@ import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Join;
import akka.persistence.typed.PersistenceId;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
@ -47,9 +48,13 @@ public class HelloWorldEventSourcedEntityExampleTest extends JUnitSuite {
ClusterSharding sharding = ClusterSharding.get(testKit.system());
sharding.init(
Entity.ofEventSourcedEntity(
Entity.of(
HelloWorld.ENTITY_TYPE_KEY,
ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId())));
entityContext ->
HelloWorld.create(
entityContext.getEntityId(),
PersistenceId.of(
entityContext.getEntityTypeKey().name(), entityContext.getEntityId()))));
_sharding = sharding;
}
return _sharding;

View file

@ -6,6 +6,7 @@ package jdocs.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import java.time.Duration;
import java.util.Collections;
@ -14,8 +15,9 @@ import java.util.Set;
import java.util.concurrent.CompletionStage;
// #persistent-entity-import
import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EventSourcedEntity;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.Effect;
import akka.persistence.typed.javadsl.EventHandler;
@ -25,6 +27,7 @@ import akka.persistence.typed.javadsl.EventHandler;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.persistence.typed.javadsl.EventSourcedBehavior;
import akka.serialization.jackson.CborSerializable;
import akka.util.Timeout;
import com.fasterxml.jackson.annotation.JsonCreator;
@ -44,10 +47,15 @@ public class HelloWorldPersistentEntityExample {
this.system = system;
sharding = ClusterSharding.get(system);
// registration at startup
sharding.init(
Entity.ofEventSourcedEntity(
Entity.of(
HelloWorld.ENTITY_TYPE_KEY,
ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId())));
entityContext ->
HelloWorld.create(
entityContext.getEntityId(),
PersistenceId.of(
entityContext.getEntityTypeKey().name(), entityContext.getEntityId()))));
}
// usage example
@ -64,7 +72,7 @@ public class HelloWorldPersistentEntityExample {
// #persistent-entity
public static class HelloWorld
extends EventSourcedEntity<HelloWorld.Command, HelloWorld.Greeted, HelloWorld.KnownPeople> {
extends EventSourcedBehavior<HelloWorld.Command, HelloWorld.Greeted, HelloWorld.KnownPeople> {
// Command
public interface Command extends CborSerializable {}
@ -124,8 +132,14 @@ public class HelloWorldPersistentEntityExample {
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "HelloWorld");
public HelloWorld(ActorContext<Command> ctx, String entityId) {
super(ENTITY_TYPE_KEY, entityId);
public static Behavior<Command> create(String entityId, PersistenceId persistenceId) {
return Behaviors.setup(context -> new HelloWorld(context, entityId, persistenceId));
}
private HelloWorld(
ActorContext<Command> context, String entityId, PersistenceId persistenceId) {
super(persistenceId);
context.getLog().info("Starting HelloWorld {}", entityId);
}
@Override

View file

@ -20,6 +20,7 @@ import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.persistence.typed.PersistenceId;
// #import
@ -204,7 +205,14 @@ interface ShardingCompileOnlyTest {
EntityTypeKey<BlogPostEntity.Command> blogTypeKey =
EntityTypeKey.create(BlogPostEntity.Command.class, "BlogPost");
sharding.init(Entity.of(blogTypeKey, ctx -> BlogPostEntity.create(ctx.getEntityId())));
sharding.init(
Entity.of(
blogTypeKey,
entityContext ->
BlogPostEntity.create(
entityContext.getEntityId(),
PersistenceId.of(
entityContext.getEntityTypeKey().name(), entityContext.getEntityId()))));
// #persistence
}
}

View file

@ -29,8 +29,10 @@ import akka.cluster.sharding.typed.scaladsl.ClusterSharding.ShardCommand
import akka.cluster.sharding.{ ClusterSharding => ClassicClusterSharding }
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
@ -60,7 +62,7 @@ object ClusterShardingPersistenceSpec {
case object UnstashAll extends Command
case object UnstashAllAndPassivate extends Command
val typeKey = EntityTypeKey[Command]("test")
val TypeKey = EntityTypeKey[Command]("test")
val lifecycleProbes = new ConcurrentHashMap[String, ActorRef[String]]
@ -78,9 +80,8 @@ object ClusterShardingPersistenceSpec {
// transient state (testing purpose)
var stashing = false
EventSourcedEntity[Command, String, String](
entityTypeKey = typeKey,
entityId = entityId,
EventSourcedBehavior[Command, String, String](
PersistenceId(TypeKey.name, entityId),
emptyState = "",
commandHandler = (state, cmd) =>
cmd match {
@ -164,7 +165,7 @@ class ClusterShardingPersistenceSpec
val regionStateProbe = TestProbe[CurrentShardRegionState]()
val classicRegion = ClassicClusterSharding(system.toClassic)
regionStateProbe.awaitAssert {
classicRegion.shardRegion(typeKey.name).tell(GetShardRegionState, regionStateProbe.ref.toClassic)
classicRegion.shardRegion(TypeKey.name).tell(GetShardRegionState, regionStateProbe.ref.toClassic)
regionStateProbe.receiveMessage().shards.foreach { shardState =>
shardState.entityIds should not contain entityId
}
@ -173,7 +174,7 @@ class ClusterShardingPersistenceSpec
"Typed cluster sharding with persistent actor" must {
ClusterSharding(system).init(Entity(typeKey, ctx => persistentEntity(ctx.entityId, ctx.shard)))
ClusterSharding(system).init(Entity(TypeKey)(ctx => persistentEntity(ctx.entityId, ctx.shard)))
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
@ -181,7 +182,7 @@ class ClusterShardingPersistenceSpec
val entityId = nextEntityId()
val p = TestProbe[String]()
val ref = ClusterSharding(system).entityRefFor(typeKey, entityId)
val ref = ClusterSharding(system).entityRefFor(TypeKey, entityId)
ref ! Add("a")
ref ! Add("b")
ref ! Add("c")
@ -193,7 +194,7 @@ class ClusterShardingPersistenceSpec
val entityId = nextEntityId()
val p = TestProbe[String]()
val ref = ClusterSharding(system).entityRefFor(typeKey, entityId)
val ref = ClusterSharding(system).entityRefFor(TypeKey, entityId)
val done1 = ref ? AddWithConfirmation("a")
done1.futureValue should ===(Done)
@ -210,7 +211,7 @@ class ClusterShardingPersistenceSpec
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val p1 = TestProbe[Done]()
val ref = ClusterSharding(system).entityRefFor(typeKey, entityId)
val ref = ClusterSharding(system).entityRefFor(TypeKey, entityId)
(1 to 10).foreach { n =>
ref ! PassivateAndPersist(n.toString)(p1.ref)
@ -233,7 +234,7 @@ class ClusterShardingPersistenceSpec
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
val entityRef = ClusterSharding(system).entityRefFor(TypeKey, entityId)
// this will wakeup the entity, and complete the entityActorRefPromise
entityRef ! AddWithConfirmation("a")(addProbe.ref)
addProbe.expectMessage(Done)
@ -281,7 +282,7 @@ class ClusterShardingPersistenceSpec
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
val entityRef = ClusterSharding(system).entityRefFor(TypeKey, entityId)
// this will wakeup the entity, and complete the entityActorRefPromise
entityRef ! AddWithConfirmation("a")(addProbe.ref)
addProbe.expectMessage(Done)
@ -333,7 +334,7 @@ class ClusterShardingPersistenceSpec
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
val entityRef = ClusterSharding(system).entityRefFor(TypeKey, entityId)
// this will wakeup the entity, and complete the entityActorRefPromise
entityRef ! AddWithConfirmation("a")(addProbe.ref)
addProbe.expectMessage(Done)
@ -370,7 +371,7 @@ class ClusterShardingPersistenceSpec
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
val entityRef = ClusterSharding(system).entityRefFor(TypeKey, entityId)
val ignoreFirstEchoProbe = TestProbe[String]()
val echoProbe = TestProbe[String]()
// first echo will wakeup the entity, and complete the entityActorRefPromise
@ -405,7 +406,7 @@ class ClusterShardingPersistenceSpec
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
val entityRef = ClusterSharding(system).entityRefFor(TypeKey, entityId)
val addProbe = TestProbe[Done]()
val ignoreFirstEchoProbe = TestProbe[String]()
val echoProbe = TestProbe[String]()
@ -454,7 +455,7 @@ class ClusterShardingPersistenceSpec
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val p1 = TestProbe[Done]()
val ref = ClusterSharding(system).entityRefFor(typeKey, entityId)
val ref = ClusterSharding(system).entityRefFor(TypeKey, entityId)
ref ! Add("1")
ref ! Add("2")
ref ! BeginStashingAddCommands
@ -480,7 +481,7 @@ class ClusterShardingPersistenceSpec
val lifecycleProbe = TestProbe[String]()
lifecycleProbes.put(entityId, lifecycleProbe.ref)
val ref = ClusterSharding(system).entityRefFor(typeKey, entityId)
val ref = ClusterSharding(system).entityRefFor(TypeKey, entityId)
ref ! Add("1")
lifecycleProbe.expectMessage(max = 10.seconds, "recoveryCompleted:")
ref ! BeginStashingAddCommands

View file

@ -199,13 +199,13 @@ class ClusterShardingSpec
}
private val shardingRefSystem1WithEnvelope: ActorRef[ShardingEnvelope[TestProtocol]] =
sharding.init(Entity(typeKeyWithEnvelopes, ctx => behavior(ctx.shard)).withStopMessage(StopPlz()))
sharding.init(Entity(typeKeyWithEnvelopes)(ctx => behavior(ctx.shard)).withStopMessage(StopPlz()))
private val shardingRefSystem2WithEnvelope: ActorRef[ShardingEnvelope[TestProtocol]] =
sharding2.init(Entity(typeKeyWithEnvelopes, ctx => behavior(ctx.shard)).withStopMessage(StopPlz()))
sharding2.init(Entity(typeKeyWithEnvelopes)(ctx => behavior(ctx.shard)).withStopMessage(StopPlz()))
private val shardingRefSystem1WithoutEnvelope: ActorRef[IdTestProtocol] = sharding.init(
Entity(typeKeyWithoutEnvelopes, _ => behaviorWithId())
Entity(typeKeyWithoutEnvelopes)(_ => behaviorWithId())
.withMessageExtractor(ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
case IdReplyPlz(id, _) => id
case IdWhoAreYou(id, _) => id
@ -214,7 +214,7 @@ class ClusterShardingSpec
.withStopMessage(IdStopPlz()))
private val shardingRefSystem2WithoutEnvelope: ActorRef[IdTestProtocol] = sharding2.init(
Entity(typeKeyWithoutEnvelopes, _ => behaviorWithId())
Entity(typeKeyWithoutEnvelopes)(_ => behaviorWithId())
.withMessageExtractor(idTestProtocolMessageExtractor)
.withStopMessage(IdStopPlz()))
@ -277,7 +277,7 @@ class ClusterShardingSpec
val typeKey3 = EntityTypeKey[TestProtocol]("passivate-test")
val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] =
sharding.init(Entity(typeKey3, ctx => behavior(ctx.shard, Some(stopProbe.ref))).withStopMessage(StopPlz()))
sharding.init(Entity(typeKey3)(ctx => behavior(ctx.shard, Some(stopProbe.ref))).withStopMessage(StopPlz()))
shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref))
p.expectMessage("Hello!")
@ -296,7 +296,7 @@ class ClusterShardingSpec
val typeKey4 = EntityTypeKey[TestProtocol]("passivate-test-poison")
val shardingRef4: ActorRef[ShardingEnvelope[TestProtocol]] =
sharding.init(Entity(typeKey4, ctx => behavior(ctx.shard, Some(stopProbe.ref))))
sharding.init(Entity(typeKey4)(ctx => behavior(ctx.shard, Some(stopProbe.ref))))
// no StopPlz stopMessage
shardingRef4 ! ShardingEnvelope(s"test4", ReplyPlz(p.ref))
@ -314,7 +314,7 @@ class ClusterShardingSpec
// sharding has been already initialized with EntityTypeKey[TestProtocol]("envelope-shard")
val ex = intercept[Exception] {
sharding.init(
Entity(EntityTypeKey[IdTestProtocol]("envelope-shard"), _ => behaviorWithId()).withStopMessage(IdStopPlz()))
Entity(EntityTypeKey[IdTestProtocol]("envelope-shard"))(_ => behaviorWithId()).withStopMessage(IdStopPlz()))
}
ex.getMessage should include("already initialized")
@ -385,7 +385,7 @@ class ClusterShardingSpec
"EntityRef - AskTimeoutException" in {
val ignorantKey = EntityTypeKey[TestProtocol]("ignorant")
sharding.init(Entity(ignorantKey, _ => Behaviors.ignore[TestProtocol]).withStopMessage(StopPlz()))
sharding.init(Entity(ignorantKey)(_ => Behaviors.ignore[TestProtocol]).withStopMessage(StopPlz()))
val ref = sharding.entityRefFor(ignorantKey, "sloppy")

View file

@ -40,7 +40,7 @@ class ClusterShardingStateSpec
probe.expectMessage(CurrentShardRegionState(Set()))
val shardingRef: ActorRef[IdTestProtocol] = sharding.init(
Entity(typeKey, _ => ClusterShardingSpec.behaviorWithId())
Entity(typeKey)(_ => ClusterShardingSpec.behaviorWithId())
.withStopMessage(IdStopPlz())
.withMessageExtractor(idTestProtocolMessageExtractor))

View file

@ -1,54 +0,0 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed.scaladsl
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.persistence.typed.PersistenceId
import org.scalatest.Matchers
import org.scalatest.WordSpec
class EntityTypeKeySpec extends WordSpec with Matchers with LogCapturing {
"EntityTypeKey" must {
"use | as default entityIdSeparator for compatibility with Lagom's scaladsl" in {
EntityTypeKey[String]("MyType").persistenceIdFrom("abc") should ===(PersistenceId("MyType|abc"))
}
"support custom entityIdSeparator for compatibility with Lagom's javadsl" in {
EntityTypeKey[String]("MyType").withEntityIdSeparator("").persistenceIdFrom("abc") should ===(
PersistenceId("MyTypeabc"))
}
"support custom entityIdSeparator for compatibility with other naming" in {
EntityTypeKey[String]("MyType").withEntityIdSeparator("#/#").persistenceIdFrom("abc") should ===(
PersistenceId("MyType#/#abc"))
}
"not allow | in name because it's the default entityIdSeparator" in {
intercept[IllegalArgumentException] {
EntityTypeKey[String]("Invalid | name")
}
}
"not allow custom separator in name" in {
intercept[IllegalArgumentException] {
EntityTypeKey[String]("Invalid name").withEntityIdSeparator(" ")
}
}
"not allow | in entityId because it's the default entityIdSeparator" in {
intercept[IllegalArgumentException] {
EntityTypeKey[String]("SomeType").persistenceIdFrom("A|B")
}
}
"not allow custom separator in entityId" in {
intercept[IllegalArgumentException] {
EntityTypeKey[String]("SomeType").withEntityIdSeparator("#").persistenceIdFrom("A#B")
}
}
}
}

View file

@ -13,6 +13,7 @@ import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
@ -43,7 +44,9 @@ class AccountExampleSpec
super.beforeAll()
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
sharding.init(Entity(AccountEntity.TypeKey, ctx => AccountEntity(ctx.entityId)))
sharding.init(Entity(AccountEntity.TypeKey) { entityContext =>
AccountEntity(entityContext.entityId, PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId))
})
}
"Account example" must {

View file

@ -7,8 +7,9 @@ package docs.akka.cluster.sharding.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.serialization.jackson.CborSerializable
/**
@ -133,10 +134,9 @@ object AccountExampleWithCommandHandlersInState {
val TypeKey: EntityTypeKey[Command[_]] =
EntityTypeKey[Command[_]]("Account")
def apply(accountNumber: String): Behavior[Command[_]] = {
EventSourcedEntity.withEnforcedReplies[Command[_], Event, Account](
TypeKey,
accountNumber,
def apply(persistenceId: PersistenceId): Behavior[Command[_]] = {
EventSourcedBehavior.withEnforcedReplies[Command[_], Event, Account](
persistenceId,
EmptyAccount,
(state, cmd) => state.applyCommand(cmd),
(state, event) => state.applyEvent(event))

View file

@ -7,8 +7,9 @@ package docs.akka.cluster.sharding.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.ReplyEffect
import akka.serialization.jackson.CborSerializable
@ -95,40 +96,41 @@ object AccountExampleWithEventHandlersInState {
// to generate the stub with types for the command and event handlers.
//#withEnforcedReplies
def apply(accountNumber: String): Behavior[Command[_]] = {
EventSourcedEntity.withEnforcedReplies(TypeKey, accountNumber, EmptyAccount, commandHandler, eventHandler)
def apply(accountNumber: String, persistenceId: PersistenceId): Behavior[Command[_]] = {
EventSourcedBehavior.withEnforcedReplies(persistenceId, EmptyAccount, commandHandler(accountNumber), eventHandler)
}
//#withEnforcedReplies
private val commandHandler: (Account, Command[_]) => ReplyEffect[Event, Account] = { (state, cmd) =>
state match {
case EmptyAccount =>
cmd match {
case c: CreateAccount => createAccount(c)
case _ => Effect.unhandled.thenNoReply() // CreateAccount before handling any other commands
}
private def commandHandler(accountNumber: String): (Account, Command[_]) => ReplyEffect[Event, Account] = {
(state, cmd) =>
state match {
case EmptyAccount =>
cmd match {
case c: CreateAccount => createAccount(c)
case _ => Effect.unhandled.thenNoReply() // CreateAccount before handling any other commands
}
case acc @ OpenedAccount(_) =>
cmd match {
case c: Deposit => deposit(c)
case c: Withdraw => withdraw(acc, c)
case c: GetBalance => getBalance(acc, c)
case c: CloseAccount => closeAccount(acc, c)
case c: CreateAccount => Effect.reply(c.replyTo)(Rejected("Account is already created"))
}
case acc @ OpenedAccount(_) =>
cmd match {
case c: Deposit => deposit(c)
case c: Withdraw => withdraw(acc, c)
case c: GetBalance => getBalance(acc, c)
case c: CloseAccount => closeAccount(acc, c)
case c: CreateAccount => Effect.reply(c.replyTo)(Rejected(s"Account $accountNumber is already created"))
}
case ClosedAccount =>
cmd match {
case c @ (_: Deposit | _: Withdraw) =>
Effect.reply(c.replyTo)(Rejected("Account is closed"))
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(Zero))
case CloseAccount(replyTo) =>
Effect.reply(replyTo)(Rejected("Account is already closed"))
case CreateAccount(replyTo) =>
Effect.reply(replyTo)(Rejected("Account is already created"))
}
}
case ClosedAccount =>
cmd match {
case c @ (_: Deposit | _: Withdraw) =>
Effect.reply(c.replyTo)(Rejected(s"Account $accountNumber is closed"))
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(Zero))
case CloseAccount(replyTo) =>
Effect.reply(replyTo)(Rejected(s"Account $accountNumber is already closed"))
case CreateAccount(replyTo) =>
Effect.reply(replyTo)(Rejected(s"Account $accountNumber is already closed"))
}
}
}
private val eventHandler: (Account, Event) => Account = { (state, event) =>

View file

@ -7,9 +7,9 @@ package docs.akka.cluster.sharding.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.serialization.jackson.CborSerializable
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.serialization.jackson.CborSerializable
/**
@ -118,10 +118,9 @@ object AccountExampleWithOptionState {
val TypeKey: EntityTypeKey[Command[_]] =
EntityTypeKey[Command[_]]("Account")
def apply(accountNumber: String): Behavior[Command[_]] = {
EventSourcedEntity.withEnforcedReplies[Command[_], Event, Option[Account]](
TypeKey,
accountNumber,
def apply(persistenceId: PersistenceId): Behavior[Command[_]] = {
EventSourcedBehavior.withEnforcedReplies[Command[_], Event, Option[Account]](
persistenceId,
None,
(state, cmd) =>
state match {

View file

@ -10,6 +10,7 @@ import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
@ -39,14 +40,16 @@ class HelloWorldEventSourcedEntityExampleSpec
super.beforeAll()
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
sharding.init(Entity(HelloWorld.entityTypeKey, ctx => HelloWorld.persistentEntity(ctx.entityId)))
sharding.init(Entity(HelloWorld.TypeKey) { entityContext =>
HelloWorld(entityContext.entityId, PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId))
})
}
"HelloWorld example" must {
"sayHello" in {
val probe = createTestProbe[Greeting]()
val ref = ClusterSharding(system).entityRefFor(HelloWorld.entityTypeKey, "1")
val ref = ClusterSharding(system).entityRefFor(HelloWorld.TypeKey, "1")
ref ! Greet("Alice")(probe.ref)
probe.expectMessage(Greeting("Alice", 1))
ref ! Greet("Bob")(probe.ref)

View file

@ -9,6 +9,9 @@ import scala.concurrent.duration._
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.serialization.jackson.CborSerializable
object HelloWorldPersistentEntityExample {
@ -21,18 +24,17 @@ object HelloWorldPersistentEntityExample {
class HelloWorldService(system: ActorSystem[_]) {
import system.executionContext
// registration at startup
private val sharding = ClusterSharding(system)
sharding.init(
Entity(
typeKey = HelloWorld.entityTypeKey,
createBehavior = entityContext => HelloWorld.persistentEntity(entityContext.entityId)))
// registration at startup
sharding.init(Entity(typeKey = HelloWorld.TypeKey) { entityContext =>
HelloWorld(entityContext.entityId, PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId))
})
private implicit val askTimeout: Timeout = Timeout(5.seconds)
def greet(worldId: String, whom: String): Future[Int] = {
val entityRef = sharding.entityRefFor(HelloWorld.entityTypeKey, worldId)
val entityRef = sharding.entityRefFor(HelloWorld.TypeKey, worldId)
val greeting = entityRef ? HelloWorld.Greet(whom)
greeting.map(_.numberOfPeople)
}
@ -43,7 +45,6 @@ object HelloWorldPersistentEntityExample {
//#persistent-entity
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity
import akka.persistence.typed.scaladsl.Effect
object HelloWorld {
@ -77,16 +78,15 @@ object HelloWorldPersistentEntityExample {
state.add(evt.whom)
}
val entityTypeKey: EntityTypeKey[Command] =
val TypeKey: EntityTypeKey[Command] =
EntityTypeKey[Command]("HelloWorld")
def persistentEntity(entityId: String): Behavior[Command] =
EventSourcedEntity(
entityTypeKey = entityTypeKey,
entityId = entityId,
emptyState = KnownPeople(Set.empty),
commandHandler,
eventHandler)
def apply(entityId: String, persistenceId: PersistenceId): Behavior[Command] = {
Behaviors.setup { context =>
context.log.info("Starting HelloWorld {}", entityId)
EventSourcedBehavior(persistenceId, emptyState = KnownPeople(Set.empty), commandHandler, eventHandler)
}
}
}
//#persistent-entity

View file

@ -5,9 +5,11 @@
package docs.akka.cluster.sharding.typed
import scala.concurrent.duration._
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.persistence.typed.PersistenceId
import com.github.ghik.silencer.silent
import docs.akka.persistence.typed.BlogPostEntity
import docs.akka.persistence.typed.BlogPostEntity.Command
@ -55,7 +57,7 @@ object ShardingCompileOnlySpec {
val TypeKey = EntityTypeKey[Counter.Command]("Counter")
val shardRegion: ActorRef[ShardingEnvelope[Counter.Command]] =
sharding.init(Entity(typeKey = TypeKey, createBehavior = ctx => Counter(ctx.entityId)))
sharding.init(Entity(TypeKey)(createBehavior = entityContext => Counter(entityContext.entityId)))
//#init
//#send
@ -70,7 +72,9 @@ object ShardingCompileOnlySpec {
//#persistence
val BlogTypeKey = EntityTypeKey[Command]("BlogPost")
ClusterSharding(system).init(Entity(typeKey = BlogTypeKey, createBehavior = ctx => BlogPostEntity(ctx.entityId)))
ClusterSharding(system).init(Entity(BlogTypeKey) { entityContext =>
BlogPostEntity(entityContext.entityId, PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId))
})
//#persistence
}
@ -115,9 +119,8 @@ object ShardingCompileOnlySpec {
//#counter-passivate-init
val TypeKey = EntityTypeKey[Counter.Command]("Counter")
ClusterSharding(system).init(
Entity(typeKey = TypeKey, createBehavior = ctx => Counter(ctx.shard, ctx.entityId))
.withStopMessage(Counter.GoodByeCounter))
ClusterSharding(system).init(Entity(TypeKey)(createBehavior = entityContext =>
Counter(entityContext.shard, entityContext.entityId)).withStopMessage(Counter.GoodByeCounter))
//#counter-passivate-init
}

View file

@ -508,9 +508,6 @@ Akka Typed APIs are still marked as [may change](../common/may-change.md) and a
made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible changes are:
* `Behaviors.intercept` now takes a factory function for the interceptor.
* Factory method `Entity.ofPersistentEntity` is renamed to `Entity.ofEventSourcedEntity` in the Java API for Akka Cluster Sharding Typed.
* New abstract class `EventSourcedEntityWithEnforcedReplies` in Java API for Akka Cluster Sharding Typed and corresponding factory method `Entity.ofEventSourcedEntityWithEnforcedReplies` to ease the creation of `EventSourcedBehavior` with enforced replies.
* New method `EventSourcedEntity.withEnforcedReplies` added to Scala API to ease the creation of `EventSourcedBehavior` with enforced replies.
* `ActorSystem.scheduler` previously gave access to the classic `akka.actor.Scheduler` but now returns a typed specific `akka.actor.typed.Scheduler`.
Additionally `schedule` method has been replaced by `scheduleWithFixedDelay` and `scheduleAtFixedRate`. Actors that needs to schedule tasks should
prefer `Behaviors.withTimers`.
@ -539,6 +536,12 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible
* `GetDataDeleted` and `UpdateDataDeleted` introduced as described in @ref[DataDeleted](#datadeleted).
* `SubscribeResponse` introduced in `Subscribe` because the responses can be both `Changed` and `Deleted`.
* `ReplicationDeleteFailure` renamed to `DeleteFailure`.
* `EventSourcedEntity` removed in favor using plain `EventSourcedBehavior` because the alternative way was
causing more confusion than adding value. Construction of `PersistentId` for the `EventSourcedBehavior` is
facilitated by factory methods in `PersistenceId`.
* `akka.cluster.sharding.typed.scaladsl.Entity.apply` changed to use two parameter lists because the new
`EntityContext.entityTypeKey` required additional type parameter that is inferred better with a secondary
parameter list.
* `EventSourcedBehavior.withEnforcedReplies` signature changed. Command is not required to extend `ExpectingReply` anymore. `ExpectingReply` has been removed therefore.
#### Akka Typed Stream API changes

View file

@ -89,10 +89,10 @@ Java
When using sharding entities can be moved to different nodes in the cluster. Persistence can be used to recover the state of
an actor after it has moved.
Akka Persistence is based on the single-writer principle, for a particular `persitenceId` only one persistent actor
Akka Persistence is based on the single-writer principle, for a particular `PersistenceId` only one persistent actor
instance should be active. If multiple instances were to persist events at the same time, the events would be
interleaved and might not be interpreted correctly on replay. Cluster sharding is typically used together with
persistence to ensure that there is only one active entity for each `persistenceId` (`entityId`).
interleaved and might not be interpreted correctly on replay. Cluster Sharding is typically used together with
persistence to ensure that there is only one active entity for each `PersistenceId` (`entityId`).
Here is an example of a persistent actor that is used as a sharded entity:
@ -102,11 +102,6 @@ Scala
Java
: @@snip [HelloWorldPersistentEntityExample.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java) { #persistent-entity-import #persistent-entity }
Note that `EventSourcedEntity` is used in this example. Any `Behavior` can be used as a sharded entity actor,
but the combination of sharding and persistent actors is very common and therefore the `EventSourcedEntity`
@scala[factory]@java[class] is provided as convenience. It selects the `persistenceId` automatically from
the `EntityTypeKey` and `entityId` @java[constructor] parameters by using `EntityTypeKey.persistenceIdFrom`.
To initialize and use the entity:
Scala
@ -115,6 +110,11 @@ Scala
Java
: @@snip [HelloWorldPersistentEntityExample.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java) { #persistent-entity-usage-import #persistent-entity-usage }
Note how an unique @apidoc[akka.persistence.typed.PersistenceId] can be constructed from the `EntityTypeKey` and the `entityId`
provided by the @apidoc[typed.*.EntityContext] in the factory function for the `Behavior`. This is a typical way
of defining the `PersistenceId` but formats are possible, as described in the
@ref:[PersistenceId section](persistence.md#persistenceid).
Sending messages to persistent entities is the same as if the entity wasn't persistent. The only difference is
when an entity is moved the state will be restored. In the above example @ref:[ask](interaction-patterns.md#outside-ask)
is used but `tell` or any of the other @ref:[Interaction Patterns](interaction-patterns.md) can be used.

View file

@ -380,7 +380,7 @@ Links to reference documentation:
The correspondence of the classic `PersistentActor` is @scala[`akka.persistence.typed.scaladsl.EventSourcedBehavior`]@java[`akka.persistence.typed.javadsl.EventSourcedBehavior`].
The Typed API is much more guided to facilitate event sourcing best practises. It also has tighter integration with
Cluster Sharding via `EventSourcedEntity`.
Cluster Sharding.
Links to reference documentation:

View file

@ -58,6 +58,36 @@ based on the events.
Next we'll discuss each of these in detail.
### PersistenceId
The @apidoc[akka.persistence.typed.PersistenceId] is the stable unique identifier for the persistent actor in the backend
event journal and snapshot store.
@ref:[Cluster Sharding](cluster-sharding.md) is typically used together with `EventSourcedBehavior` to ensure
that there is only one active entity for each `PersistenceId` (`entityId`).
The `entityId` in Cluster Sharding is the business domain identifier of the entity. The `entityId` might not
be unique enough to be used as the `PersistenceId` by itself. For example two different types of
entities may have the same `entityId`. To create a unique `PersistenceId` the `entityId` is can be prefixed
with a stable name of the entity type, which typically is the same as the `EntityTypeKey.name` that
is used in Cluster Sharding. There are @scala[`PersistenceId.apply`]@java[`PersistenceId.of`] factory methods
to help with constructing such `PersistenceId` from a `entityTypeHint` and `entityId`.
The default separator when concatenating the `entityTypeHint` and `entityId` is `|`, but a custom separator
is supported.
@@@ note
The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
you should use `""` as the separator.
@@@
The @ref:[Persistence example in the Cluster Sharding documentation](cluster-sharding.md#persistence-example)
illustrates how to construct the `PersistenceId` from the `entityTypeKey` and `entityId` provided by the
`EntityContext`.
### Command handler
The command handler is a function with 2 parameters, the current `State` and the incoming `Command`.

View file

@ -4,6 +4,128 @@
package akka.persistence.typed
object PersistenceId {
/**
* Default separator character used for concatenating a `typeHint` with `entityId` to construct unique persistenceId.
* This must be same as in Lagom's `scaladsl.PersistentEntity`, for compatibility. No separator is used
* in Lagom's `javadsl.PersistentEntity` so for compatibility with that the `""` separator must be used instead.
*/
val DefaultSeparator = "|"
/**
* Constructs a [[PersistenceId]] from the given `entityTypeHint` and `entityId` by
* concatenating them with `|` separator.
*
* Cluster Sharding is often used together with `EventSourcedBehavior` for the entities.
* The `PersistenceId` of the `EventSourcedBehavior` can typically be constructed with:
* {{{
* PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId)
* }}}
*
* That format of the `PersistenceId` is not mandatory and only provided as a convenience of
* a "standardized" format.
*
* Another separator can be defined by using the `apply` that takes a `separator` parameter.
*
* The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator.
*
* @throws IllegalArgumentException if the `entityTypeHint` or `entityId` contains `|`
*/
def apply(entityTypeHint: String, entityId: String): PersistenceId =
apply(entityTypeHint, entityId, DefaultSeparator)
/**
* Constructs a [[PersistenceId]] from the given `entityTypeHint` and `entityId` by
* concatenating them with the `separator`.
*
* Cluster Sharding is often used together with `EventSourcedBehavior` for the entities.
* The `PersistenceId` of the `EventSourcedBehavior` can typically be constructed with:
* {{{
* PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId)
* }}}
*
* That format of the `PersistenceId` is not mandatory and only provided as a convenience of
* a "standardized" format.
*
* The default separator `|` is used by the `apply` that doesn't take a `separator` parameter.
*
* The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator.
*
* @throws IllegalArgumentException if the `entityTypeHint` or `entityId` contains `separator`
*/
def apply(entityTypeHint: String, entityId: String, separator: String): PersistenceId = {
if (separator.nonEmpty) {
if (entityId.contains(separator))
throw new IllegalArgumentException(s"entityId [$entityId] contains [$separator] which is a reserved character")
if (entityTypeHint.contains(separator))
throw new IllegalArgumentException(
s"entityTypeHint [$entityTypeHint] contains [$separator] which is a reserved character")
}
PersistenceId(entityTypeHint + separator + entityId)
}
/**
* Constructs a [[PersistenceId]] from the given `entityTypeHint` and `entityId` by
* concatenating them with `|` separator.
*
* Cluster Sharding is often used together with `EventSourcedBehavior` for the entities.
* The `PersistenceId` of the `EventSourcedBehavior` can typically be constructed with:
* {{{
* PersistenceId.of(entityContext.getEntityTypeKey().name(), entityContext.getEntityId())
* }}}
*
* That format of the `PersistenceId` is not mandatory and only provided as a convenience of
* a "standardized" format.
*
* Another separator can be defined by using the `PersistenceId.of` that takes a `separator` parameter.
*
* The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator.
*
* @throws IllegalArgumentException if the `entityTypeHint` or `entityId` contains `|`
*/
def of(entityTypeHint: String, entityId: String): PersistenceId =
apply(entityTypeHint, entityId)
/**
* Constructs a [[PersistenceId]] from the given `entityTypeHint` and `entityId` by
* concatenating them with the `separator`.
*
* Cluster Sharding is often used together with `EventSourcedBehavior` for the entities.
* The `PersistenceId` of the `EventSourcedBehavior` can typically be constructed with:
* {{{
* PersistenceId.of(entityContext.getEntityTypeKey().name(), entityContext.getEntityId())
* }}}
*
* That format of the `PersistenceId` is not mandatory and only provided as a convenience of
* a "standardized" format.
*
* The default separator `|` is used by the `apply` that doesn't take a `separator` parameter.
*
* The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator.
*
* @throws IllegalArgumentException if the `entityTypeHint` or `entityId` contains `separator`
*/
def of(entityTypeHint: String, entityId: String, separator: String): PersistenceId =
apply(entityTypeHint, entityId, separator)
/**
* Constructs a [[PersistenceId]] with `id` as the full unique identifier.
*/
def of(id: String): PersistenceId =
PersistenceId(id)
}
/**
* Unique identifier in the backend data store (journal and snapshot store) of the
* persistent actor.

View file

@ -160,8 +160,12 @@ public class BlogPostEntity
// #commands
// #behavior
public static Behavior<Command> create(String entityId) {
return Behaviors.setup(ctx -> new BlogPostEntity(new PersistenceId("Blog-" + entityId)));
public static Behavior<Command> create(String entityId, PersistenceId persistenceId) {
return Behaviors.setup(
context -> {
context.getLog().info("Starting BlogPostEntity {}", entityId);
return new BlogPostEntity(persistenceId);
});
}
private BlogPostEntity(PersistenceId persistenceId) {

View file

@ -0,0 +1,51 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
import akka.actor.testkit.typed.scaladsl.LogCapturing
import org.scalatest.Matchers
import org.scalatest.WordSpec
class PersistenceIdSpec extends WordSpec with Matchers with LogCapturing {
"PersistenceId" must {
"use | as default entityIdSeparator for compatibility with Lagom's scaladsl" in {
PersistenceId("MyType", "abc") should ===(PersistenceId("MyType|abc"))
}
"support custom separator for compatibility with Lagom's javadsl" in {
PersistenceId("MyType", "abc", "") should ===(PersistenceId("MyTypeabc"))
}
"support custom entityIdSeparator for compatibility with other naming" in {
PersistenceId("MyType", "abc", "#/#") should ===(PersistenceId("MyType#/#abc"))
}
"not allow | in entityTypeName because it's the default separator" in {
intercept[IllegalArgumentException] {
PersistenceId("Invalid | name", "abc")
}
}
"not allow custom separator in entityTypeName" in {
intercept[IllegalArgumentException] {
PersistenceId("Invalid name", "abc", " ")
}
}
"not allow | in entityId because it's the default separator" in {
intercept[IllegalArgumentException] {
PersistenceId("SomeType", "A|B")
}
}
"not allow custom separator in entityId" in {
intercept[IllegalArgumentException] {
PersistenceId("SomeType", "A#B", "#")
}
}
}
}

View file

@ -95,8 +95,8 @@ object BasicPersistentBehaviorCompileOnly {
import MyPersistentBehavior._
object RecoveryBehavior {
//#recovery
def apply(): Behavior[Command] =
//#recovery
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
@ -110,8 +110,8 @@ object BasicPersistentBehaviorCompileOnly {
}
object TaggingBehavior {
//#tagging
def apply(): Behavior[Command] =
//#tagging
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),

View file

@ -7,6 +7,7 @@ package docs.akka.persistence.typed
import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
@ -55,12 +56,12 @@ object BlogPostEntity {
//#commands
//#behavior
def apply(entityId: String): Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId(s"Blog-$entityId"),
emptyState = BlankState,
commandHandler,
eventHandler)
def apply(entityId: String, persistenceId: PersistenceId): Behavior[Command] = {
Behaviors.setup { context =>
context.log.info("Starting BlogPostEntity {}", entityId)
EventSourcedBehavior[Command, Event, State](persistenceId, emptyState = BlankState, commandHandler, eventHandler)
}
}
//#behavior
//#command-handler

View file

@ -427,7 +427,6 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed")
lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
.dependsOn(
clusterTyped % "compile->compile;test->test;multi-jvm->multi-jvm",
persistenceTyped,
clusterSharding,
actorTestkitTyped % "test->test",
actorTypedTests % "test->test",