DRY ReplicatedEventSourcedBehavior.apply (#29454)

* and change signature of withEventPublishing and withDirectReplication
This commit is contained in:
Patrik Nordwall 2020-08-05 10:33:48 +02:00 committed by Christopher Batey
parent cd821fe3f8
commit 7348939ff4
9 changed files with 42 additions and 67 deletions

View file

@ -66,13 +66,13 @@ final class ReplicatedShardingSettings[M, E] private (
/**
* Start direct replication over sharding when replicated sharding starts up, requires the entities
* to also have it enabled through [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]]
* or [[akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior#withEventPublishing()]]
* to also have it enabled through [[akka.persistence.typed.scaladsl.EventSourcedBehavior.withEventPublishing]]
* or [[akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior.withEventPublishing]]
* to work.
*
*/
def withDirectReplication(): ReplicatedShardingSettings[M, E] =
new ReplicatedShardingSettings(replicas, directReplication = true)
def withDirectReplication(enabled: Boolean): ReplicatedShardingSettings[M, E] =
new ReplicatedShardingSettings(replicas, directReplication = enabled)
}

View file

@ -22,9 +22,9 @@ import akka.util.ccompat.JavaConverters._
* Akka Cluster.
*
* This actor should be started once on each node where Replicated Event Sourced entities will run (the same nodes that you start
* sharding on). The entities should be set up with [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]]
* sharding on). The entities should be set up with [[akka.persistence.typed.scaladsl.EventSourcedBehavior.withEventPublishing]]
* or [[akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior#withEventPublishing()]]
* If using [[ReplicatedSharding]] the replication can be enabled through [[ReplicatedShardingSettings#withDirectReplication()]]
* If using [[ReplicatedSharding]] the replication can be enabled through [[ReplicatedShardingSettings.withDirectReplication]]
* instead of starting this actor manually.
*
* Subscribes to locally written events through the event stream and sends the seen events to all the sharded replicas

View file

@ -317,7 +317,7 @@ query is still needed as delivery is not guaranteed, but can be configured to po
events will arrive at the replicas through the cluster.
To enable this feature you first need to enable event publishing on the @scala[`EventSourcedBehavior`]@java[`ReplicatedEventSourcedBehavior`] with `withEventPublishing`
and then enable direct replication through `withDirectReplication()` on @apidoc[ReplicatedShardingSettings] (if not using
and then enable direct replication through `withDirectReplication(true)` on @apidoc[ReplicatedShardingSettings] (if not using
replicated sharding the replication can be run standalone by starting the @apidoc[ShardingDirectReplication] actor).
The "event publishing" feature publishes each event to the local system event bus as a side effect after it has been written,

View file

@ -115,7 +115,7 @@ class ReplicatedAuctionExample
this.setup = setup;
}
//#setup
// #setup
static class AuctionSetup {
final String name;
final Bid initialBid; // the initial bid is the minimum price bidden at start time by the owner
@ -130,7 +130,7 @@ class ReplicatedAuctionExample
this.responsibleForClosing = responsibleForClosing;
}
}
//#setup
// #setup
public static final class Bid implements CborSerializable {
public final String bidder;
@ -146,11 +146,13 @@ class ReplicatedAuctionExample
}
}
//#commands
// #commands
interface Command extends CborSerializable {}
public enum Finish implements Command {
INSTANCE
}
public static final class OfferBid implements Command {
public final String bidder;
public final int offer;
@ -160,6 +162,7 @@ class ReplicatedAuctionExample
this.offer = offer;
}
}
public static final class GetHighestBid implements Command {
public final ActorRef<Bid> replyTo;
@ -167,18 +170,21 @@ class ReplicatedAuctionExample
this.replyTo = replyTo;
}
}
public static final class IsClosed implements Command {
public final ActorRef<Boolean> replyTo;
public IsClosed(ActorRef<Boolean> replyTo) {
this.replyTo = replyTo;
}
}
private enum Close implements Command {
INSTANCE
}
//#commands
// #commands
//#events
// #events
interface Event extends CborSerializable {}
public static final class BidRegistered implements Event {
@ -210,9 +216,9 @@ class ReplicatedAuctionExample
this.amount = amount;
}
}
//#events
// #events
//#state
// #state
static class AuctionState implements CborSerializable {
final boolean stillRunning;
@ -270,14 +276,14 @@ class ReplicatedAuctionExample
return !stillRunning && finishedAtDc.isEmpty();
}
}
//#state
// #state
@Override
public AuctionState emptyState() {
return new AuctionState(true, setup.initialBid, setup.initialBid.offer, Collections.emptySet());
}
//#command-handler
// #command-handler
@Override
public CommandHandler<Command, Event, AuctionState> commandHandler() {
@ -348,7 +354,7 @@ class ReplicatedAuctionExample
return builder.build();
}
//#command-handler
// #command-handler
@Override
public EventHandler<AuctionState, Event> eventHandler() {
@ -377,7 +383,7 @@ class ReplicatedAuctionExample
.build();
}
//#event-triggers
// #event-triggers
private void eventTriggers(AuctionFinished event, AuctionState newState) {
if (newState.finishedAtDc.contains(getReplicationContext().replicaId().id())) {
if (shouldClose(newState)) {
@ -387,7 +393,7 @@ class ReplicatedAuctionExample
context.getSelf().tell(Finish.INSTANCE);
}
}
//#event-triggers
// #event-triggers
private boolean shouldClose(AuctionState state) {
return setup.responsibleForClosing

View file

@ -35,7 +35,7 @@ object EventPublishingSpec {
},
(state, event) => state + event)
.withTagger(evt => if (evt.tagIt) Set("tag") else Set.empty)
.withEventPublishing()
.withEventPublishing(enabled = true)
}
}

View file

@ -251,8 +251,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
copy(recovery = recovery.toClassic)
}
override def withEventPublishing(): EventSourcedBehavior[Command, Event, State] = {
copy(publishEvents = true)
override def withEventPublishing(enabled: Boolean): EventSourcedBehavior[Command, Event, State] = {
copy(publishEvents = enabled)
}
override private[akka] def withReplication(

View file

@ -178,8 +178,14 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
/**
* INTERNAL API: DeferredBehavior init, not for user extension
*/
@InternalApi override def apply(context: typed.TypedActorContext[Command]): Behavior[Command] = {
// Note: duplicated in ReplicatedEventSourcedBehavior to not break source compatibility
@InternalApi override def apply(context: typed.TypedActorContext[Command]): Behavior[Command] =
createEventSourcedBehavior()
/**
* INTERNAL API
*/
@InternalApi private[akka] final def createEventSourcedBehavior()
: scaladsl.EventSourcedBehavior[Command, Event, State] = {
val snapshotWhen: (State, Event, Long) => Boolean = (state, event, seqNr) => shouldSnapshot(state, event, seqNr)
val tagger: Event => Set[String] = { event =>

View file

@ -11,9 +11,7 @@ import akka.actor.typed.Behavior
import akka.actor.typed.TypedActorContext
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.persistence.typed.internal
import akka.persistence.typed.internal.ReplicationContextImpl
import akka.persistence.typed.internal.EffectImpl
/**
* Base class for replicated event sourced behaviors.
@ -27,7 +25,8 @@ abstract class ReplicatedEventSourcedBehavior[Command, Event, State](
def this(replicationContext: ReplicationContext) = this(replicationContext, Optional.empty())
/**
* Override and return true to publish events to the system event stream as [[akka.persistence.typed.PublishedEvent]] after they have been persisted
* Override and return true to publish events to the system event stream as
* [[akka.persistence.typed.PublishedEvent]] after they have been persisted.
*/
def withEventPublishing: Boolean = false
@ -37,45 +36,9 @@ abstract class ReplicatedEventSourcedBehavior[Command, Event, State](
* INTERNAL API: DeferredBehavior init, not for user extension
*/
@InternalApi override def apply(context: TypedActorContext[Command]): Behavior[Command] = {
// Note: duplicated in EventSourcedBehavior to not break source compatibility
val snapshotWhen: (State, Event, Long) => Boolean = (state, event, seqNr) => shouldSnapshot(state, event, seqNr)
val tagger: Event => Set[String] = { event =>
import akka.util.ccompat.JavaConverters._
val tags = tagsFor(event)
if (tags.isEmpty) Set.empty
else tags.asScala.toSet
}
val behavior = new internal.EventSourcedBehaviorImpl[Command, Event, State](
persistenceId,
emptyState,
(state, cmd) => commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]],
eventHandler()(_, _),
getClass)
.snapshotWhen(snapshotWhen)
.withRetention(retentionCriteria.asScala)
.withTagger(tagger)
.eventAdapter(eventAdapter())
.snapshotAdapter(snapshotAdapter())
.withJournalPluginId(journalPluginId)
.withSnapshotPluginId(snapshotPluginId)
.withRecovery(recovery.asScala)
// context not user extendable so there should never be any other impls
createEventSourcedBehavior()
// context not user extendable so there should never be any other impls
.withReplication(replicationContext.asInstanceOf[ReplicationContextImpl])
val handler = signalHandler()
val behaviorWithSignalHandler =
if (handler.isEmpty) behavior
else behavior.receiveSignal(handler.handler)
val behaviorWithOnPersistFailure =
if (onPersistFailure.isPresent)
behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get)
else
behaviorWithSignalHandler
if (withEventPublishing) behaviorWithOnPersistFailure.withEventPublishing()
else behaviorWithOnPersistFailure
.withEventPublishing(withEventPublishing)
}
}

View file

@ -222,7 +222,7 @@ object EventSourcedBehavior {
* Publish events to the system event stream as [[akka.persistence.typed.PublishedEvent]] after they have been persisted
*/
@ApiMayChange
def withEventPublishing(): EventSourcedBehavior[Command, Event, State]
def withEventPublishing(enabled: Boolean): EventSourcedBehavior[Command, Event, State]
/**
* INTERNAL API