EventPublishing API for Java (#29413)

This commit is contained in:
Johan Andrén 2020-07-22 12:26:36 +02:00 committed by Christopher Batey
parent 165b39d1e0
commit 827dc14dbb
4 changed files with 20 additions and 9 deletions

View file

@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
* *
* This actor should be started once on each node where Active Active entities will run (the same nodes that you start * This actor should be started once on each node where Active Active entities will run (the same nodes that you start
* sharding on). The entities should be set up with [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]] * sharding on). The entities should be set up with [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]]
* (FIXME not supported in Java yet) * or [[akka.persistence.typed.javadsl.ActiveActiveEventSourcedBehavior#withEventPublishing()]]
* If using [[ActiveActiveSharding]] the replication can be enabled through [[ActiveActiveShardingSettings#withDirectReplication()]] * If using [[ActiveActiveSharding]] the replication can be enabled through [[ActiveActiveShardingSettings#withDirectReplication()]]
* instead of starting this actor manually. * instead of starting this actor manually.
* *

View file

@ -67,9 +67,9 @@ final class ActiveActiveShardingSettings[M, E] private (
/** /**
* Start direct replication over sharding when active active sharding starts up, requires the entities * Start direct replication over sharding when active active sharding starts up, requires the entities
* to also have it enabled through [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]] * to also have it enabled through [[akka.persistence.typed.scaladsl.EventSourcedBehavior#withEventPublishing()]]
* or [[akka.persistence.typed.javadsl.ActiveActiveEventSourcedBehavior#withEventPublishing()]]
* to work. * to work.
*
* FIXME no support for enabling that in Java because bin/source comp.
*/ */
def withDirectReplication(): ActiveActiveShardingSettings[M, E] = def withDirectReplication(): ActiveActiveShardingSettings[M, E] =
new ActiveActiveShardingSettings(replicas, directReplication = true) new ActiveActiveShardingSettings(replicas, directReplication = true)

View file

@ -277,8 +277,8 @@ As an optimization the active active events can be published across the Akka clu
query is still needed as delivery is not guaranteed, but can be configured to poll the database less often since most query is still needed as delivery is not guaranteed, but can be configured to poll the database less often since most
events will arrive at the replicas through the cluster. events will arrive at the replicas through the cluster.
To enable this feature you first need to enable event publishing on the `EventSourcedBehavior` with `withEventPublishing` To enable this feature you first need to enable event publishing on the @scala[`EventSourcedBehavior`]@java[`ActiveActiveEventSourcedBehavior`] with `withEventPublishing`
(FIXME missing Java API) and then enable direct replication through `withDirectReplication()` on @apidoc[ActiveActiveShardingSettings] (if not using and then enable direct replication through `withDirectReplication()` on @apidoc[ActiveActiveShardingSettings] (if not using
active active sharding the replication can be run standalone by starting the @apidoc[ActiveActiveShardingDirectReplication] actor). active active sharding the replication can be run standalone by starting the @apidoc[ActiveActiveShardingDirectReplication] actor).
The "event publishing" feature publishes each event to the local system event bus as a side effect after it has been written, The "event publishing" feature publishes each event to the local system event bus as a side effect after it has been written,

View file

@ -9,11 +9,13 @@ import java.util.Optional
import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.TypedActorContext import akka.actor.typed.TypedActorContext
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.typed.internal import akka.persistence.typed.internal
import akka.persistence.typed.internal.EffectImpl import akka.persistence.typed.internal.EffectImpl
import akka.persistence.typed.scaladsl.ActiveActiveContextImpl import akka.persistence.typed.scaladsl.ActiveActiveContextImpl
@ApiMayChange
abstract class ActiveActiveEventSourcedBehavior[Command, Event, State]( abstract class ActiveActiveEventSourcedBehavior[Command, Event, State](
activeActiveContext: ActiveActiveContext, activeActiveContext: ActiveActiveContext,
onPersistFailure: Optional[BackoffSupervisorStrategy]) onPersistFailure: Optional[BackoffSupervisorStrategy])
@ -21,6 +23,11 @@ abstract class ActiveActiveEventSourcedBehavior[Command, Event, State](
def this(activeActiveContext: ActiveActiveContext) = this(activeActiveContext, Optional.empty()) def this(activeActiveContext: ActiveActiveContext) = this(activeActiveContext, Optional.empty())
/**
* Override and return true to publish events to the system event stream as [[akka.persistence.typed.PublishedEvent]] after they have been persisted
*/
def withEventPublishing: Boolean = false
protected def getActiveActiveContext(): ActiveActiveContext = activeActiveContext protected def getActiveActiveContext(): ActiveActiveContext = activeActiveContext
/** /**
@ -59,9 +66,13 @@ abstract class ActiveActiveEventSourcedBehavior[Command, Event, State](
if (handler.isEmpty) behavior if (handler.isEmpty) behavior
else behavior.receiveSignal(handler.handler) else behavior.receiveSignal(handler.handler)
val behaviorWithOnPersistFailure =
if (onPersistFailure.isPresent) if (onPersistFailure.isPresent)
behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get) behaviorWithSignalHandler.onPersistFailure(onPersistFailure.get)
else else
behaviorWithSignalHandler behaviorWithSignalHandler
if (withEventPublishing) behaviorWithOnPersistFailure.withEventPublishing()
else behaviorWithOnPersistFailure
} }
} }