Adds EventSourcedEntity with EnforcingReplies (#26692)

This commit is contained in:
Renato Cavalcanti 2019-05-17 09:09:18 +02:00 committed by Johan Andrén
parent 18d970fc8e
commit 8eef461b7b
8 changed files with 204 additions and 9 deletions

View file

@ -208,7 +208,7 @@ object Entity {
* settings can be defined using the `with` methods of the returned [[Entity]].
*
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent actors
* is very common and therefore the [[Entity.ofPersistentEntity]] is provided as convenience.
* is very common and therefore the [[Entity.ofEventSourcedEntity]] is provided as convenience.
*
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId)
@ -238,7 +238,7 @@ object Entity {
* @param createPersistentEntity Create the `PersistentEntity` for an entity given a [[EntityContext]] (includes entityId)
* @tparam Command The type of message the entity accepts
*/
def ofPersistentEntity[Command, Event, State](
def ofEventSourcedEntity[Command, Event, State](
typeKey: EntityTypeKey[Command],
createPersistentEntity: JFunction[EntityContext[Command], EventSourcedEntity[Command, Event, State]])
: Entity[Command, ShardingEnvelope[Command]] = {
@ -257,6 +257,41 @@ object Entity {
})
}
/**
* Defines how the [[EventSourcedEntityWithEnforcedReplies]] should be created. Used in [[ClusterSharding#init]]. Any [[Behavior]] can
* be used as a sharded entity actor, but the combination of sharding and persistent actors is very common
* and therefore this factory is provided as convenience.
*
* A [[EventSourcedEntityWithEnforcedReplies]] enforces that replies to commands are not forgotten.
* There will be compilation errors if the returned effect isn't a [[akka.persistence.typed.javadsl.ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[akka.persistence.typed.javadsl.Effect.thenReply]], or [[akka.persistence.typed.javadsl.Effect.thenNoReply]].
*
* More optional settings can be defined using the `with` methods of the returned [[Entity]].
*
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param createPersistentEntity Create the `PersistentEntity` for an entity given a [[EntityContext]] (includes entityId)
* @tparam Command The type of message the entity accepts
*/
def ofEventSourcedEntityWithEnforcedReplies[Command, Event, State](
typeKey: EntityTypeKey[Command],
createPersistentEntity: JFunction[
EntityContext[Command],
EventSourcedEntityWithEnforcedReplies[Command, Event, State]]): Entity[Command, ShardingEnvelope[Command]] = {
of(
typeKey,
new JFunction[EntityContext[Command], Behavior[Command]] {
override def apply(ctx: EntityContext[Command]): Behavior[Command] = {
val persistentEntity = createPersistentEntity(ctx)
if (persistentEntity.entityTypeKey != typeKey)
throw new IllegalArgumentException(
s"The [${persistentEntity.entityTypeKey}] of the PersistentEntity " +
s" [${persistentEntity.getClass.getName}] doesn't match expected $typeKey.")
persistentEntity
}
})
}
}
/**

View file

@ -8,7 +8,7 @@ import java.util.Optional
import akka.actor.typed.BackoffSupervisorStrategy
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.javadsl.EventSourcedBehavior
import akka.persistence.typed.javadsl.{ EventSourcedBehavior, EventSourcedBehaviorWithEnforcedReplies }
/**
* Any [[akka.actor.typed.Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent
@ -42,3 +42,40 @@ abstract class EventSourcedEntity[Command, Event, State] private (
}
}
/**
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent
* actors is very common and therefore this `PersistentEntity` class is provided as convenience.
*
* A [[EventSourcedEntityWithEnforcedReplies]] enforces that replies to commands are not forgotten.
* There will be compilation errors if the returned effect isn't a [[akka.persistence.typed.javadsl.ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[akka.persistence.typed.javadsl.Effect.thenReply]], or [[akka.persistence.typed.javadsl.Effect.thenNoReply]].
*
* It is a [[EventSourcedBehavior]] and is implemented in the same way. It selects the `persistenceId`
* automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using
* [[EntityTypeKey.persistenceIdFrom]].
*/
abstract class EventSourcedEntityWithEnforcedReplies[Command, Event, State] private (
val entityTypeKey: EntityTypeKey[Command],
val entityId: String,
persistenceId: PersistenceId,
onPersistFailure: Optional[BackoffSupervisorStrategy])
extends EventSourcedBehaviorWithEnforcedReplies[Command, Event, State](persistenceId, onPersistFailure) {
def this(entityTypeKey: EntityTypeKey[Command], entityId: String) = {
this(
entityTypeKey,
entityId,
persistenceId = entityTypeKey.persistenceIdFrom(entityId),
Optional.empty[BackoffSupervisorStrategy])
}
def this(entityTypeKey: EntityTypeKey[Command], entityId: String, onPersistFailure: BackoffSupervisorStrategy) = {
this(
entityTypeKey,
entityId,
persistenceId = entityTypeKey.persistenceIdFrom(entityId),
Optional.ofNullable(onPersistFailure))
}
}

View file

@ -3,9 +3,8 @@
*/
package akka.cluster.sharding.typed.scaladsl
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplyEffect }
object EventSourcedEntity {
@ -26,4 +25,30 @@ object EventSourcedEntity {
commandHandler: (State, Command) => Effect[Event, State],
eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] =
EventSourcedBehavior(entityTypeKey.persistenceIdFrom(entityId), emptyState, commandHandler, eventHandler)
/**
* Create a `Behavior` for a persistent actor that is used with Cluster Sharding
* and enforces that replies to commands are not forgotten.
*
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]].
*
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent
* actors is very common and therefore this `PersistentEntity` is provided as convenience.
*
* It is a [[EventSourcedBehavior]] and is implemented in the same way. It selects the `persistenceId`
* automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using
* [[EntityTypeKey.persistenceIdFrom]].
*/
def withEnforcedReplies[Command <: ExpectingReply[_], Event, State](
entityTypeKey: EntityTypeKey[Command],
entityId: String,
emptyState: State,
commandHandler: (State, Command) ReplyEffect[Event, State],
eventHandler: (State, Event) State): EventSourcedBehavior[Command, Event, State] =
EventSourcedBehavior.withEnforcedReplies(
entityTypeKey.persistenceIdFrom(entityId),
emptyState,
commandHandler,
eventHandler)
}

View file

@ -127,7 +127,7 @@ public class ClusterShardingPersistenceTest extends JUnitSuite {
ClusterSharding sharding = ClusterSharding.get(testKit.system());
sharding.init(
Entity.ofPersistentEntity(
Entity.ofEventSourcedEntity(
TestPersistentEntity.ENTITY_TYPE_KEY,
entityContext -> new TestPersistentEntity(entityContext.getEntityId())));

View file

@ -0,0 +1,88 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed.javadsl;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.typed.ActorRef;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Join;
import akka.persistence.typed.ExpectingReply;
import akka.persistence.typed.javadsl.*;
import org.junit.ClassRule;
public class ShardingEventSourcedEntityWithEnforcedRepliesCompileOnlyTest {
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();
interface Command extends ExpectingReply<String> {}
static class Append implements Command {
public final String s;
private final ActorRef<String> replyToRef;
Append(String s, ActorRef<String> replyTo) {
this.s = s;
this.replyToRef = replyTo;
}
@Override
public ActorRef<String> replyTo() {
return replyToRef;
}
}
static class TestPersistentEntityWithEnforcedReplies
extends EventSourcedEntityWithEnforcedReplies<Command, String, String> {
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "HelloWorld");
public TestPersistentEntityWithEnforcedReplies(String entityId) {
super(ENTITY_TYPE_KEY, entityId);
}
@Override
public String emptyState() {
return "";
}
@Override
public CommandHandlerWithReply<Command, String, String> commandHandler() {
return newCommandHandlerWithReplyBuilder()
.forAnyState()
.onCommand(Append.class, this::add)
.build();
}
private ReplyEffect<String, String> add(String state, Append cmd) {
return Effect().persist(cmd.s).thenReply(cmd, s -> "Ok");
}
@Override
public EventHandler<String, String> eventHandler() {
return newEventHandlerBuilder().forAnyState().onEvent(String.class, this::applyEvent).build();
}
private String applyEvent(String state, String evt) {
if (state.trim().isEmpty()) return evt;
else return state + "|" + evt;
}
}
private void shardingForEventSourcedEntityWithReplies() {
// initialize first time only
Cluster cluster = Cluster.get(testKit.system());
cluster.manager().tell(new Join(cluster.selfMember().address()));
ClusterSharding sharding = ClusterSharding.get(testKit.system());
sharding.init(
Entity.ofEventSourcedEntityWithEnforcedReplies(
TestPersistentEntityWithEnforcedReplies.ENTITY_TYPE_KEY,
entityContext ->
new TestPersistentEntityWithEnforcedReplies(entityContext.getEntityId())));
}
}

View file

@ -42,7 +42,7 @@ public class HelloWorldEventSourcedEntityExampleTest extends JUnitSuite {
ClusterSharding sharding = ClusterSharding.get(testKit.system());
sharding.init(
Entity.ofPersistentEntity(
Entity.ofEventSourcedEntity(
HelloWorld.ENTITY_TYPE_KEY,
ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId())));
_sharding = sharding;

View file

@ -43,7 +43,7 @@ public class HelloWorldPersistentEntityExample {
sharding = ClusterSharding.get(system);
sharding.init(
Entity.ofPersistentEntity(
Entity.ofEventSourcedEntity(
HelloWorld.ENTITY_TYPE_KEY,
ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId())));
}

View file

@ -188,3 +188,13 @@ instead either run under `/system/localReceptionist` or `/system/clusterReceptio
The path change makes it impossible to do a rolling upgrade from 2.5 to 2.6 if you use Akka Typed and the receptionist
as the old and the new nodes receptionists will not be able to communicate.
### Akka Typed API changes
Akka Typed APIs are still marked as [may change](../common/may-change.md) and therefore its API can still change without deprecation period. The following is a list of API changes since the latest release:
* Factory method `Entity.ofPersistentEntity` is renamed to `Entity.ofEventSourcedEntity` in the Java API for Akka Cluster Sharding Typed.
* New abstract class `EventSourcedEntityWithEnforcedReplies` in Java API for Akka Cluster Sharding Typed and corresponding factory method `Entity.ofEventSourcedEntityWithEnforcedReplies` to ease the creation of `EventSourcedBehavior` with enforced replies.
* New method `EventSourcedEntity.withEnforcedReplies` added to Scala API to ease the creation of `EventSourcedBehavior` with enforced replies.