From 827dc14dbb4ce966516414bdda7f803a4e5b354f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 22 Jul 2020 12:26:36 +0200 Subject: [PATCH] EventPublishing API for Java (#29413) --- ...ctiveActiveShardingDirectReplication.scala | 2 +- .../typed/ActiveActiveShardingSettings.scala | 4 ++-- .../typed/persistence-active-active.md | 4 ++-- .../ActiveActiveEventSourcedBehavior.scala | 19 +++++++++++++++---- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala index 0ff09d5b6f..5e1ba61e80 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ * * This actor should be started once on each node where Active Active entities will run (the same nodes that you start * sharding on). The entities should be set up with [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]] - * (FIXME not supported in Java yet) + * or [[akka.persistence.typed.javadsl.ActiveActiveEventSourcedBehavior#withEventPublishing()]] * If using [[ActiveActiveSharding]] the replication can be enabled through [[ActiveActiveShardingSettings#withDirectReplication()]] * instead of starting this actor manually. * diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingSettings.scala index 0f3b92cec4..09ae1fcb8c 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingSettings.scala @@ -67,9 +67,9 @@ final class ActiveActiveShardingSettings[M, E] private ( /** * Start direct replication over sharding when active active sharding starts up, requires the entities * to also have it enabled through [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]] + * or [[akka.persistence.typed.javadsl.ActiveActiveEventSourcedBehavior#withEventPublishing()]] * to work. - * - * FIXME no support for enabling that in Java because bin/source comp. + */ def withDirectReplication(): ActiveActiveShardingSettings[M, E] = new ActiveActiveShardingSettings(replicas, directReplication = true) diff --git a/akka-docs/src/main/paradox/typed/persistence-active-active.md b/akka-docs/src/main/paradox/typed/persistence-active-active.md index bca97075c0..dcf79528b4 100644 --- a/akka-docs/src/main/paradox/typed/persistence-active-active.md +++ b/akka-docs/src/main/paradox/typed/persistence-active-active.md @@ -277,8 +277,8 @@ As an optimization the active active events can be published across the Akka clu query is still needed as delivery is not guaranteed, but can be configured to poll the database less often since most events will arrive at the replicas through the cluster. -To enable this feature you first need to enable event publishing on the `EventSourcedBehavior` with `withEventPublishing` -(FIXME missing Java API) and then enable direct replication through `withDirectReplication()` on @apidoc[ActiveActiveShardingSettings] (if not using +To enable this feature you first need to enable event publishing on the @scala[`EventSourcedBehavior`]@java[`ActiveActiveEventSourcedBehavior`] with `withEventPublishing` +and then enable direct replication through `withDirectReplication()` on @apidoc[ActiveActiveShardingSettings] (if not using active active sharding the replication can be run standalone by starting the @apidoc[ActiveActiveShardingDirectReplication] actor). The "event publishing" feature publishes each event to the local system event bus as a side effect after it has been written, diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcedBehavior.scala index d4ad096a74..9e315b04f1 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcedBehavior.scala @@ -9,11 +9,13 @@ import java.util.Optional import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior import akka.actor.typed.TypedActorContext +import akka.annotation.ApiMayChange import akka.annotation.InternalApi import akka.persistence.typed.internal import akka.persistence.typed.internal.EffectImpl import akka.persistence.typed.scaladsl.ActiveActiveContextImpl +@ApiMayChange abstract class ActiveActiveEventSourcedBehavior[Command, Event, State]( activeActiveContext: ActiveActiveContext, onPersistFailure: Optional[BackoffSupervisorStrategy]) @@ -21,6 +23,11 @@ abstract class ActiveActiveEventSourcedBehavior[Command, Event, State]( def this(activeActiveContext: ActiveActiveContext) = this(activeActiveContext, Optional.empty()) + /** + * Override and return true to publish events to the system event stream as [[akka.persistence.typed.PublishedEvent]] after they have been persisted + */ + def withEventPublishing: Boolean = false + protected def getActiveActiveContext(): ActiveActiveContext = activeActiveContext /** @@ -59,9 +66,13 @@ abstract class ActiveActiveEventSourcedBehavior[Command, Event, State]( if (handler.isEmpty) behavior else behavior.receiveSignal(handler.handler) - if (onPersistFailure.isPresent) - behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get) - else - behaviorWithSignalHandler + val behaviorWithOnPersistFailure = + if (onPersistFailure.isPresent) + behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get) + else + behaviorWithSignalHandler + + if (withEventPublishing) behaviorWithOnPersistFailure.withEventPublishing() + else behaviorWithOnPersistFailure } }