diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index 95763f0654..fb88cf5b39 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -208,7 +208,7 @@ object Entity { * settings can be defined using the `with` methods of the returned [[Entity]]. * * Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent actors - * is very common and therefore the [[Entity.ofPersistentEntity]] is provided as convenience. + * is very common and therefore the [[Entity.ofEventSourcedEntity]] is provided as convenience. * * @param typeKey A key that uniquely identifies the type of entity in this cluster * @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId) @@ -238,7 +238,7 @@ object Entity { * @param createPersistentEntity Create the `PersistentEntity` for an entity given a [[EntityContext]] (includes entityId) * @tparam Command The type of message the entity accepts */ - def ofPersistentEntity[Command, Event, State]( + def ofEventSourcedEntity[Command, Event, State]( typeKey: EntityTypeKey[Command], createPersistentEntity: JFunction[EntityContext[Command], EventSourcedEntity[Command, Event, State]]) : Entity[Command, ShardingEnvelope[Command]] = { @@ -257,6 +257,41 @@ object Entity { }) } + /** + * Defines how the [[EventSourcedEntityWithEnforcedReplies]] should be created. Used in [[ClusterSharding#init]]. Any [[Behavior]] can + * be used as a sharded entity actor, but the combination of sharding and persistent actors is very common + * and therefore this factory is provided as convenience. + * + * A [[EventSourcedEntityWithEnforcedReplies]] enforces that replies to commands are not forgotten. + * There will be compilation errors if the returned effect isn't a [[akka.persistence.typed.javadsl.ReplyEffect]], which can be + * created with `Effects().reply`, `Effects().noReply`, [[akka.persistence.typed.javadsl.Effect.thenReply]], or [[akka.persistence.typed.javadsl.Effect.thenNoReply]]. + * + * More optional settings can be defined using the `with` methods of the returned [[Entity]]. + * + * @param typeKey A key that uniquely identifies the type of entity in this cluster + * @param createPersistentEntity Create the `PersistentEntity` for an entity given a [[EntityContext]] (includes entityId) + * @tparam Command The type of message the entity accepts + */ + def ofEventSourcedEntityWithEnforcedReplies[Command, Event, State]( + typeKey: EntityTypeKey[Command], + createPersistentEntity: JFunction[ + EntityContext[Command], + EventSourcedEntityWithEnforcedReplies[Command, Event, State]]): Entity[Command, ShardingEnvelope[Command]] = { + + of( + typeKey, + new JFunction[EntityContext[Command], Behavior[Command]] { + override def apply(ctx: EntityContext[Command]): Behavior[Command] = { + val persistentEntity = createPersistentEntity(ctx) + if (persistentEntity.entityTypeKey != typeKey) + throw new IllegalArgumentException( + s"The [${persistentEntity.entityTypeKey}] of the PersistentEntity " + + s" [${persistentEntity.getClass.getName}] doesn't match expected $typeKey.") + persistentEntity + } + }) + } + } /** diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/EventSourcedEntity.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/EventSourcedEntity.scala index 749b11d52d..722a1c5f46 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/EventSourcedEntity.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/EventSourcedEntity.scala @@ -8,7 +8,7 @@ import java.util.Optional import akka.actor.typed.BackoffSupervisorStrategy import akka.persistence.typed.PersistenceId -import akka.persistence.typed.javadsl.EventSourcedBehavior +import akka.persistence.typed.javadsl.{ EventSourcedBehavior, EventSourcedBehaviorWithEnforcedReplies } /** * Any [[akka.actor.typed.Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent @@ -42,3 +42,40 @@ abstract class EventSourcedEntity[Command, Event, State] private ( } } + +/** + * Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent + * actors is very common and therefore this `PersistentEntity` class is provided as convenience. + * + * A [[EventSourcedEntityWithEnforcedReplies]] enforces that replies to commands are not forgotten. + * There will be compilation errors if the returned effect isn't a [[akka.persistence.typed.javadsl.ReplyEffect]], which can be + * created with `Effects().reply`, `Effects().noReply`, [[akka.persistence.typed.javadsl.Effect.thenReply]], or [[akka.persistence.typed.javadsl.Effect.thenNoReply]]. + * + * It is a [[EventSourcedBehavior]] and is implemented in the same way. It selects the `persistenceId` + * automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using + * [[EntityTypeKey.persistenceIdFrom]]. + */ +abstract class EventSourcedEntityWithEnforcedReplies[Command, Event, State] private ( + val entityTypeKey: EntityTypeKey[Command], + val entityId: String, + persistenceId: PersistenceId, + onPersistFailure: Optional[BackoffSupervisorStrategy]) + extends EventSourcedBehaviorWithEnforcedReplies[Command, Event, State](persistenceId, onPersistFailure) { + + def this(entityTypeKey: EntityTypeKey[Command], entityId: String) = { + this( + entityTypeKey, + entityId, + persistenceId = entityTypeKey.persistenceIdFrom(entityId), + Optional.empty[BackoffSupervisorStrategy]) + } + + def this(entityTypeKey: EntityTypeKey[Command], entityId: String, onPersistFailure: BackoffSupervisorStrategy) = { + this( + entityTypeKey, + entityId, + persistenceId = entityTypeKey.persistenceIdFrom(entityId), + Optional.ofNullable(onPersistFailure)) + } + +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala index 2e77d2d5bc..38a091b3d0 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala @@ -3,9 +3,8 @@ */ package akka.cluster.sharding.typed.scaladsl - -import akka.persistence.typed.scaladsl.Effect -import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.ExpectingReply +import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplyEffect } object EventSourcedEntity { @@ -26,4 +25,30 @@ object EventSourcedEntity { commandHandler: (State, Command) => Effect[Event, State], eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = EventSourcedBehavior(entityTypeKey.persistenceIdFrom(entityId), emptyState, commandHandler, eventHandler) + + /** + * Create a `Behavior` for a persistent actor that is used with Cluster Sharding + * and enforces that replies to commands are not forgotten. + * + * Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be + * created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]]. + * + * Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent + * actors is very common and therefore this `PersistentEntity` is provided as convenience. + * + * It is a [[EventSourcedBehavior]] and is implemented in the same way. It selects the `persistenceId` + * automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using + * [[EntityTypeKey.persistenceIdFrom]]. + */ + def withEnforcedReplies[Command <: ExpectingReply[_], Event, State]( + entityTypeKey: EntityTypeKey[Command], + entityId: String, + emptyState: State, + commandHandler: (State, Command) ⇒ ReplyEffect[Event, State], + eventHandler: (State, Event) ⇒ State): EventSourcedBehavior[Command, Event, State] = + EventSourcedBehavior.withEnforcedReplies( + entityTypeKey.persistenceIdFrom(entityId), + emptyState, + commandHandler, + eventHandler) } diff --git a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java index 2af5ab2f48..27cc49265b 100644 --- a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java +++ b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java @@ -127,7 +127,7 @@ public class ClusterShardingPersistenceTest extends JUnitSuite { ClusterSharding sharding = ClusterSharding.get(testKit.system()); sharding.init( - Entity.ofPersistentEntity( + Entity.ofEventSourcedEntity( TestPersistentEntity.ENTITY_TYPE_KEY, entityContext -> new TestPersistentEntity(entityContext.getEntityId()))); diff --git a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ShardingEventSourcedEntityWithEnforcedRepliesCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ShardingEventSourcedEntityWithEnforcedRepliesCompileOnlyTest.java new file mode 100644 index 0000000000..bf30d8ba4e --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ShardingEventSourcedEntityWithEnforcedRepliesCompileOnlyTest.java @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.javadsl; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.typed.ActorRef; +import akka.cluster.typed.Cluster; +import akka.cluster.typed.Join; +import akka.persistence.typed.ExpectingReply; +import akka.persistence.typed.javadsl.*; +import org.junit.ClassRule; + +public class ShardingEventSourcedEntityWithEnforcedRepliesCompileOnlyTest { + + @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(); + + interface Command extends ExpectingReply {} + + static class Append implements Command { + public final String s; + private final ActorRef replyToRef; + + Append(String s, ActorRef replyTo) { + this.s = s; + this.replyToRef = replyTo; + } + + @Override + public ActorRef replyTo() { + return replyToRef; + } + } + + static class TestPersistentEntityWithEnforcedReplies + extends EventSourcedEntityWithEnforcedReplies { + + public static final EntityTypeKey ENTITY_TYPE_KEY = + EntityTypeKey.create(Command.class, "HelloWorld"); + + public TestPersistentEntityWithEnforcedReplies(String entityId) { + super(ENTITY_TYPE_KEY, entityId); + } + + @Override + public String emptyState() { + return ""; + } + + @Override + public CommandHandlerWithReply commandHandler() { + return newCommandHandlerWithReplyBuilder() + .forAnyState() + .onCommand(Append.class, this::add) + .build(); + } + + private ReplyEffect add(String state, Append cmd) { + return Effect().persist(cmd.s).thenReply(cmd, s -> "Ok"); + } + + @Override + public EventHandler eventHandler() { + return newEventHandlerBuilder().forAnyState().onEvent(String.class, this::applyEvent).build(); + } + + private String applyEvent(String state, String evt) { + if (state.trim().isEmpty()) return evt; + else return state + "|" + evt; + } + } + + private void shardingForEventSourcedEntityWithReplies() { + + // initialize first time only + Cluster cluster = Cluster.get(testKit.system()); + cluster.manager().tell(new Join(cluster.selfMember().address())); + + ClusterSharding sharding = ClusterSharding.get(testKit.system()); + + sharding.init( + Entity.ofEventSourcedEntityWithEnforcedReplies( + TestPersistentEntityWithEnforcedReplies.ENTITY_TYPE_KEY, + entityContext -> + new TestPersistentEntityWithEnforcedReplies(entityContext.getEntityId()))); + } +} diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldEventSourcedEntityExampleTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldEventSourcedEntityExampleTest.java index f240620b49..132749ba5b 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldEventSourcedEntityExampleTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldEventSourcedEntityExampleTest.java @@ -42,7 +42,7 @@ public class HelloWorldEventSourcedEntityExampleTest extends JUnitSuite { ClusterSharding sharding = ClusterSharding.get(testKit.system()); sharding.init( - Entity.ofPersistentEntity( + Entity.ofEventSourcedEntity( HelloWorld.ENTITY_TYPE_KEY, ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId()))); _sharding = sharding; diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java index 2393f65de9..abd9b6b54b 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java @@ -43,7 +43,7 @@ public class HelloWorldPersistentEntityExample { sharding = ClusterSharding.get(system); sharding.init( - Entity.ofPersistentEntity( + Entity.ofEventSourcedEntity( HelloWorld.ENTITY_TYPE_KEY, ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId()))); } diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 641539cac5..9dcded2134 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -188,3 +188,13 @@ instead either run under `/system/localReceptionist` or `/system/clusterReceptio The path change makes it impossible to do a rolling upgrade from 2.5 to 2.6 if you use Akka Typed and the receptionist as the old and the new nodes receptionists will not be able to communicate. + + +### Akka Typed API changes + +Akka Typed APIs are still marked as [may change](../common/may-change.md) and therefore its API can still change without deprecation period. The following is a list of API changes since the latest release: + +* Factory method `Entity.ofPersistentEntity` is renamed to `Entity.ofEventSourcedEntity` in the Java API for Akka Cluster Sharding Typed. +* New abstract class `EventSourcedEntityWithEnforcedReplies` in Java API for Akka Cluster Sharding Typed and corresponding factory method `Entity.ofEventSourcedEntityWithEnforcedReplies` to ease the creation of `EventSourcedBehavior` with enforced replies. +* New method `EventSourcedEntity.withEnforcedReplies` added to Scala API to ease the creation of `EventSourcedBehavior` with enforced replies. +