From 7348939ff4c48da8faccf222303893d9c4b61aca Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 5 Aug 2020 10:33:48 +0200 Subject: [PATCH] DRY ReplicatedEventSourcedBehavior.apply (#29454) * and change signature of withEventPublishing and withDirectReplication --- .../typed/ReplicatedShardingSettings.scala | 8 ++-- .../typed/ShardingDirectReplication.scala | 4 +- .../paradox/typed/replicated-eventsourcing.md | 2 +- .../typed/ReplicatedAuctionExampleTest.java | 30 +++++++----- .../typed/EventPublishingSpec.scala | 2 +- .../internal/EventSourcedBehaviorImpl.scala | 4 +- .../typed/javadsl/EventSourcedBehavior.scala | 10 +++- .../ReplicatedEventSourcedBehavior.scala | 47 ++----------------- .../typed/scaladsl/EventSourcedBehavior.scala | 2 +- 9 files changed, 42 insertions(+), 67 deletions(-) diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingSettings.scala index 799e243960..332241f111 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingSettings.scala @@ -66,13 +66,13 @@ final class ReplicatedShardingSettings[M, E] private ( /** * Start direct replication over sharding when replicated sharding starts up, requires the entities - * to also have it enabled through [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]] - * or [[akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior#withEventPublishing()]] + * to also have it enabled through [[akka.persistence.typed.scaladsl.EventSourcedBehavior.withEventPublishing]] + * or [[akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior.withEventPublishing]] * to work. * */ - def withDirectReplication(): ReplicatedShardingSettings[M, E] = - new ReplicatedShardingSettings(replicas, directReplication = true) + def withDirectReplication(enabled: Boolean): ReplicatedShardingSettings[M, E] = + new ReplicatedShardingSettings(replicas, directReplication = enabled) } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala index f667348a21..97edb4fa0f 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingDirectReplication.scala @@ -22,9 +22,9 @@ import akka.util.ccompat.JavaConverters._ * Akka Cluster. * * This actor should be started once on each node where Replicated Event Sourced entities will run (the same nodes that you start - * sharding on). The entities should be set up with [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]] + * sharding on). The entities should be set up with [[akka.persistence.typed.scaladsl.EventSourcedBehavior.withEventPublishing]] * or [[akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior#withEventPublishing()]] - * If using [[ReplicatedSharding]] the replication can be enabled through [[ReplicatedShardingSettings#withDirectReplication()]] + * If using [[ReplicatedSharding]] the replication can be enabled through [[ReplicatedShardingSettings.withDirectReplication]] * instead of starting this actor manually. * * Subscribes to locally written events through the event stream and sends the seen events to all the sharded replicas diff --git a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md index 66e8932173..f5c1df80b8 100644 --- a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md +++ b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md @@ -317,7 +317,7 @@ query is still needed as delivery is not guaranteed, but can be configured to po events will arrive at the replicas through the cluster. To enable this feature you first need to enable event publishing on the @scala[`EventSourcedBehavior`]@java[`ReplicatedEventSourcedBehavior`] with `withEventPublishing` -and then enable direct replication through `withDirectReplication()` on @apidoc[ReplicatedShardingSettings] (if not using +and then enable direct replication through `withDirectReplication(true)` on @apidoc[ReplicatedShardingSettings] (if not using replicated sharding the replication can be run standalone by starting the @apidoc[ShardingDirectReplication] actor). The "event publishing" feature publishes each event to the local system event bus as a side effect after it has been written, diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java index 70d38cf566..f7274a7f08 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java @@ -115,7 +115,7 @@ class ReplicatedAuctionExample this.setup = setup; } - //#setup + // #setup static class AuctionSetup { final String name; final Bid initialBid; // the initial bid is the minimum price bidden at start time by the owner @@ -130,7 +130,7 @@ class ReplicatedAuctionExample this.responsibleForClosing = responsibleForClosing; } } - //#setup + // #setup public static final class Bid implements CborSerializable { public final String bidder; @@ -146,11 +146,13 @@ class ReplicatedAuctionExample } } - //#commands + // #commands interface Command extends CborSerializable {} + public enum Finish implements Command { INSTANCE } + public static final class OfferBid implements Command { public final String bidder; public final int offer; @@ -160,6 +162,7 @@ class ReplicatedAuctionExample this.offer = offer; } } + public static final class GetHighestBid implements Command { public final ActorRef replyTo; @@ -167,18 +170,21 @@ class ReplicatedAuctionExample this.replyTo = replyTo; } } + public static final class IsClosed implements Command { public final ActorRef replyTo; + public IsClosed(ActorRef replyTo) { this.replyTo = replyTo; } } + private enum Close implements Command { INSTANCE } - //#commands + // #commands - //#events + // #events interface Event extends CborSerializable {} public static final class BidRegistered implements Event { @@ -210,9 +216,9 @@ class ReplicatedAuctionExample this.amount = amount; } } - //#events + // #events - //#state + // #state static class AuctionState implements CborSerializable { final boolean stillRunning; @@ -270,14 +276,14 @@ class ReplicatedAuctionExample return !stillRunning && finishedAtDc.isEmpty(); } } - //#state + // #state @Override public AuctionState emptyState() { return new AuctionState(true, setup.initialBid, setup.initialBid.offer, Collections.emptySet()); } - //#command-handler + // #command-handler @Override public CommandHandler commandHandler() { @@ -348,7 +354,7 @@ class ReplicatedAuctionExample return builder.build(); } - //#command-handler + // #command-handler @Override public EventHandler eventHandler() { @@ -377,7 +383,7 @@ class ReplicatedAuctionExample .build(); } - //#event-triggers + // #event-triggers private void eventTriggers(AuctionFinished event, AuctionState newState) { if (newState.finishedAtDc.contains(getReplicationContext().replicaId().id())) { if (shouldClose(newState)) { @@ -387,7 +393,7 @@ class ReplicatedAuctionExample context.getSelf().tell(Finish.INSTANCE); } } - //#event-triggers + // #event-triggers private boolean shouldClose(AuctionState state) { return setup.responsibleForClosing diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/EventPublishingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/EventPublishingSpec.scala index 3f33a41381..62cab82ff4 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/EventPublishingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/EventPublishingSpec.scala @@ -35,7 +35,7 @@ object EventPublishingSpec { }, (state, event) => state + event) .withTagger(evt => if (evt.tagIt) Set("tag") else Set.empty) - .withEventPublishing() + .withEventPublishing(enabled = true) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 56daa2493d..09503e67f2 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -251,8 +251,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( copy(recovery = recovery.toClassic) } - override def withEventPublishing(): EventSourcedBehavior[Command, Event, State] = { - copy(publishEvents = true) + override def withEventPublishing(enabled: Boolean): EventSourcedBehavior[Command, Event, State] = { + copy(publishEvents = enabled) } override private[akka] def withReplication( diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala index fc0a1cf57c..ec5ea886c2 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -178,8 +178,14 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] ( /** * INTERNAL API: DeferredBehavior init, not for user extension */ - @InternalApi override def apply(context: typed.TypedActorContext[Command]): Behavior[Command] = { - // Note: duplicated in ReplicatedEventSourcedBehavior to not break source compatibility + @InternalApi override def apply(context: typed.TypedActorContext[Command]): Behavior[Command] = + createEventSourcedBehavior() + + /** + * INTERNAL API + */ + @InternalApi private[akka] final def createEventSourcedBehavior() + : scaladsl.EventSourcedBehavior[Command, Event, State] = { val snapshotWhen: (State, Event, Long) => Boolean = (state, event, seqNr) => shouldSnapshot(state, event, seqNr) val tagger: Event => Set[String] = { event => diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcedBehavior.scala index 7346615e8f..f660e813d6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcedBehavior.scala @@ -11,9 +11,7 @@ import akka.actor.typed.Behavior import akka.actor.typed.TypedActorContext import akka.annotation.ApiMayChange import akka.annotation.InternalApi -import akka.persistence.typed.internal import akka.persistence.typed.internal.ReplicationContextImpl -import akka.persistence.typed.internal.EffectImpl /** * Base class for replicated event sourced behaviors. @@ -27,7 +25,8 @@ abstract class ReplicatedEventSourcedBehavior[Command, Event, State]( def this(replicationContext: ReplicationContext) = this(replicationContext, Optional.empty()) /** - * Override and return true to publish events to the system event stream as [[akka.persistence.typed.PublishedEvent]] after they have been persisted + * Override and return true to publish events to the system event stream as + * [[akka.persistence.typed.PublishedEvent]] after they have been persisted. */ def withEventPublishing: Boolean = false @@ -37,45 +36,9 @@ abstract class ReplicatedEventSourcedBehavior[Command, Event, State]( * INTERNAL API: DeferredBehavior init, not for user extension */ @InternalApi override def apply(context: TypedActorContext[Command]): Behavior[Command] = { - // Note: duplicated in EventSourcedBehavior to not break source compatibility - val snapshotWhen: (State, Event, Long) => Boolean = (state, event, seqNr) => shouldSnapshot(state, event, seqNr) - - val tagger: Event => Set[String] = { event => - import akka.util.ccompat.JavaConverters._ - val tags = tagsFor(event) - if (tags.isEmpty) Set.empty - else tags.asScala.toSet - } - - val behavior = new internal.EventSourcedBehaviorImpl[Command, Event, State]( - persistenceId, - emptyState, - (state, cmd) => commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]], - eventHandler()(_, _), - getClass) - .snapshotWhen(snapshotWhen) - .withRetention(retentionCriteria.asScala) - .withTagger(tagger) - .eventAdapter(eventAdapter()) - .snapshotAdapter(snapshotAdapter()) - .withJournalPluginId(journalPluginId) - .withSnapshotPluginId(snapshotPluginId) - .withRecovery(recovery.asScala) - // context not user extendable so there should never be any other impls + createEventSourcedBehavior() + // context not user extendable so there should never be any other impls .withReplication(replicationContext.asInstanceOf[ReplicationContextImpl]) - - val handler = signalHandler() - val behaviorWithSignalHandler = - if (handler.isEmpty) behavior - else behavior.receiveSignal(handler.handler) - - val behaviorWithOnPersistFailure = - if (onPersistFailure.isPresent) - behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get) - else - behaviorWithSignalHandler - - if (withEventPublishing) behaviorWithOnPersistFailure.withEventPublishing() - else behaviorWithOnPersistFailure + .withEventPublishing(withEventPublishing) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index 0c4c114ed8..60c13ab218 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -222,7 +222,7 @@ object EventSourcedBehavior { * Publish events to the system event stream as [[akka.persistence.typed.PublishedEvent]] after they have been persisted */ @ApiMayChange - def withEventPublishing(): EventSourcedBehavior[Command, Event, State] + def withEventPublishing(enabled: Boolean): EventSourcedBehavior[Command, Event, State] /** * INTERNAL API