Fix AbstractPersistentActor ambiguity with Scala 2.12 #22218
This commit is contained in:
parent
64b35cf1f1
commit
8b5018ba21
5 changed files with 238 additions and 138 deletions
|
|
@ -478,6 +478,18 @@ They will receive ``Replicator.Deleted`` instead.
|
|||
Persistence
|
||||
===========
|
||||
|
||||
Binary incompatibility of PersistentActor and AtLeastOneDelivery
|
||||
----------------------------------------------------------------
|
||||
|
||||
To be able to evolve the Java APIs ``AbstractPersistentActor`` and ``AbstractPersistentActorWithAtLeastOnceDelivery``
|
||||
to work with Scala 2.12 we could find no other solution but to break the binary compatibility of the Scala versions
|
||||
(which the Java ones were based on).
|
||||
|
||||
This means that the Akka 2.5 artifact cannot be a class path drop in replacement of Akka 2.4 if you use
|
||||
``PersistentActor`` or ``AtLeastOnceDelivery``, to do this upgrade you _must_ recompile your project with the new
|
||||
version of Akka.
|
||||
|
||||
|
||||
Removal of PersistentView
|
||||
-------------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -6,9 +6,11 @@ package akka.persistence
|
|||
import scala.collection.breakOut
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.actor.{ ActorSelection, ActorPath, NotInfluenceReceiveTimeout }
|
||||
import akka.actor.{ ActorPath, ActorSelection, NotInfluenceReceiveTimeout }
|
||||
import akka.persistence.serialization.Message
|
||||
import akka.actor.Cancellable
|
||||
import akka.annotation.InternalApi
|
||||
import akka.persistence.AtLeastOnceDelivery.Internal.Delivery
|
||||
|
||||
object AtLeastOnceDelivery {
|
||||
|
||||
|
|
@ -73,7 +75,7 @@ object AtLeastOnceDelivery {
|
|||
}
|
||||
|
||||
/**
|
||||
* Mix-in this trait with your `PersistentActor` to send messages with at-least-once
|
||||
* Scala API: Mix-in this trait with your `PersistentActor` to send messages with at-least-once
|
||||
* delivery semantics to destinations. It takes care of re-sending messages when they
|
||||
* have not been confirmed within a configurable timeout. Use the [[AtLeastOnceDeliveryLike#deliver]] method to
|
||||
* send a message to a destination. Call the [[AtLeastOnceDeliveryLike#confirmDelivery]] method when the destination
|
||||
|
|
@ -104,8 +106,59 @@ object AtLeastOnceDelivery {
|
|||
* as a blob in your custom snapshot.
|
||||
*
|
||||
* @see [[AtLeastOnceDeliveryLike]]
|
||||
* @see [[AbstractPersistentActorWithAtLeastOnceDelivery]] for Java API
|
||||
*/
|
||||
trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike
|
||||
trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike {
|
||||
|
||||
/**
|
||||
* Scala API: Send the message created by the `deliveryIdToMessage` function to
|
||||
* the `destination` actor. It will retry sending the message until
|
||||
* the delivery is confirmed with [[#confirmDelivery]]. Correlation
|
||||
* between `deliver` and `confirmDelivery` is performed with the
|
||||
* `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
|
||||
* function. The `deliveryId` is typically passed in the message to the
|
||||
* destination, which replies with a message containing the same `deliveryId`.
|
||||
*
|
||||
* The `deliveryId` is a strictly monotonically increasing sequence number without
|
||||
* gaps. The same sequence is used for all destinations of the actor, i.e. when sending
|
||||
* to multiple destinations the destinations will see gaps in the sequence if no
|
||||
* translation is performed.
|
||||
*
|
||||
* During recovery this method will not send out the message, but it will be sent
|
||||
* later if no matching `confirmDelivery` was performed.
|
||||
*
|
||||
* This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
|
||||
* if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
|
||||
*/
|
||||
def deliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit = {
|
||||
internalDeliver(destination)(deliveryIdToMessage)
|
||||
}
|
||||
|
||||
/**
|
||||
* Scala API: Send the message created by the `deliveryIdToMessage` function to
|
||||
* the `destination` actor. It will retry sending the message until
|
||||
* the delivery is confirmed with [[#confirmDelivery]]. Correlation
|
||||
* between `deliver` and `confirmDelivery` is performed with the
|
||||
* `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
|
||||
* function. The `deliveryId` is typically passed in the message to the
|
||||
* destination, which replies with a message containing the same `deliveryId`.
|
||||
*
|
||||
* The `deliveryId` is a strictly monotonically increasing sequence number without
|
||||
* gaps. The same sequence is used for all destinations of the actor, i.e. when sending
|
||||
* to multiple destinations the destinations will see gaps in the sequence if no
|
||||
* translation is performed.
|
||||
*
|
||||
* During recovery this method will not send out the message, but it will be sent
|
||||
* later if no matching `confirmDelivery` was performed.
|
||||
*
|
||||
* This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
|
||||
* if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
|
||||
*/
|
||||
def deliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit = {
|
||||
internalDeliver(destination)(deliveryIdToMessage)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @see [[AtLeastOnceDelivery]]
|
||||
|
|
@ -190,26 +243,10 @@ trait AtLeastOnceDeliveryLike extends Eventsourced {
|
|||
}
|
||||
|
||||
/**
|
||||
* Scala API: Send the message created by the `deliveryIdToMessage` function to
|
||||
* the `destination` actor. It will retry sending the message until
|
||||
* the delivery is confirmed with [[#confirmDelivery]]. Correlation
|
||||
* between `deliver` and `confirmDelivery` is performed with the
|
||||
* `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
|
||||
* function. The `deliveryId` is typically passed in the message to the
|
||||
* destination, which replies with a message containing the same `deliveryId`.
|
||||
*
|
||||
* The `deliveryId` is a strictly monotonically increasing sequence number without
|
||||
* gaps. The same sequence is used for all destinations of the actor, i.e. when sending
|
||||
* to multiple destinations the destinations will see gaps in the sequence if no
|
||||
* translation is performed.
|
||||
*
|
||||
* During recovery this method will not send out the message, but it will be sent
|
||||
* later if no matching `confirmDelivery` was performed.
|
||||
*
|
||||
* This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
|
||||
* if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
|
||||
* INTERNAL API
|
||||
*/
|
||||
def deliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit = {
|
||||
@InternalApi
|
||||
private[akka] final def internalDeliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit = {
|
||||
if (unconfirmed.size >= maxUnconfirmedMessages)
|
||||
throw new MaxUnconfirmedMessagesExceededException(
|
||||
s"Too many unconfirmed messages, maximum allowed is [$maxUnconfirmedMessages]")
|
||||
|
|
@ -225,31 +262,15 @@ trait AtLeastOnceDeliveryLike extends Eventsourced {
|
|||
}
|
||||
|
||||
/**
|
||||
* Scala API: Send the message created by the `deliveryIdToMessage` function to
|
||||
* the `destination` actor. It will retry sending the message until
|
||||
* the delivery is confirmed with [[#confirmDelivery]]. Correlation
|
||||
* between `deliver` and `confirmDelivery` is performed with the
|
||||
* `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
|
||||
* function. The `deliveryId` is typically passed in the message to the
|
||||
* destination, which replies with a message containing the same `deliveryId`.
|
||||
*
|
||||
* The `deliveryId` is a strictly monotonically increasing sequence number without
|
||||
* gaps. The same sequence is used for all destinations of the actor, i.e. when sending
|
||||
* to multiple destinations the destinations will see gaps in the sequence if no
|
||||
* translation is performed.
|
||||
*
|
||||
* During recovery this method will not send out the message, but it will be sent
|
||||
* later if no matching `confirmDelivery` was performed.
|
||||
*
|
||||
* This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
|
||||
* if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
|
||||
* INTERNAL API
|
||||
*/
|
||||
def deliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit = {
|
||||
@InternalApi
|
||||
private[akka] final def internalDeliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit = {
|
||||
val isWildcardSelection = destination.pathString.contains("*")
|
||||
require(!isWildcardSelection, "Delivering to wildcard actor selections is not supported by AtLeastOnceDelivery. " +
|
||||
"Introduce an mediator Actor which this AtLeastOnceDelivery Actor will deliver the messages to," +
|
||||
"and will handle the logic of fan-out and collecting individual confirmations, until it can signal confirmation back to this Actor.")
|
||||
deliver(ActorPath.fromString(destination.toSerializationFormat))(deliveryIdToMessage)
|
||||
internalDeliver(ActorPath.fromString(destination.toSerializationFormat))(deliveryIdToMessage)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -389,7 +410,7 @@ abstract class UntypedPersistentActorWithAtLeastOnceDelivery extends UntypedPers
|
|||
* if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
|
||||
*/
|
||||
def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit =
|
||||
super.deliver(destination)(id ⇒ deliveryIdToMessage.apply(id))
|
||||
internalDeliver(destination)(id ⇒ deliveryIdToMessage.apply(id))
|
||||
|
||||
/**
|
||||
* Java API: Send the message created by the `deliveryIdToMessage` function to
|
||||
|
|
@ -412,13 +433,13 @@ abstract class UntypedPersistentActorWithAtLeastOnceDelivery extends UntypedPers
|
|||
* if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
|
||||
*/
|
||||
def deliver(destination: ActorSelection, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit =
|
||||
super.deliver(destination)(id ⇒ deliveryIdToMessage.apply(id))
|
||||
internalDeliver(destination)(id ⇒ deliveryIdToMessage.apply(id))
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API compatible with lambda expressions
|
||||
* Java API: compatible with lambda expressions
|
||||
*
|
||||
* Use this class instead of `UntypedPersistentActor` to send messages
|
||||
* Use this class instead of `AbstractPersistentActor` to send messages
|
||||
* with at-least-once delivery semantics to destinations.
|
||||
* Full documentation in [[AtLeastOnceDelivery]].
|
||||
*
|
||||
|
|
@ -447,7 +468,7 @@ abstract class AbstractPersistentActorWithAtLeastOnceDelivery extends AbstractPe
|
|||
* if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
|
||||
*/
|
||||
def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit =
|
||||
super.deliver(destination)(id ⇒ deliveryIdToMessage.apply(id))
|
||||
internalDeliver(destination)(id ⇒ deliveryIdToMessage.apply(id))
|
||||
|
||||
/**
|
||||
* Java API: Send the message created by the `deliveryIdToMessage` function to
|
||||
|
|
@ -470,5 +491,5 @@ abstract class AbstractPersistentActorWithAtLeastOnceDelivery extends AbstractPe
|
|||
* if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
|
||||
*/
|
||||
def deliver(destination: ActorSelection, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit =
|
||||
super.deliver(destination)(id ⇒ deliveryIdToMessage.apply(id))
|
||||
internalDeliver(destination)(id ⇒ deliveryIdToMessage.apply(id))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import java.util.UUID
|
|||
import scala.collection.immutable
|
||||
import scala.util.control.NonFatal
|
||||
import akka.actor.{ DeadLetter, StashOverflowException }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import akka.event.Logging
|
||||
import akka.event.LoggingAdapter
|
||||
|
|
@ -292,31 +293,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
|
|||
def receiveCommand: Receive
|
||||
|
||||
/**
|
||||
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
|
||||
* persisted event. It is guaranteed that no new commands will be received by a persistent actor
|
||||
* between a call to `persist` and the execution of its `handler`. This also holds for
|
||||
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
|
||||
* commands and unstashing them when the `event` has been persisted and handled. The stash used
|
||||
* for that is an internal stash which doesn't interfere with the inherited user stash.
|
||||
*
|
||||
* An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
|
||||
* event is the sender of the corresponding command. This means that one can reply to a command
|
||||
* sender within an event `handler`.
|
||||
*
|
||||
* Within an event handler, applications usually update persistent actor state using persisted event
|
||||
* data, notify listeners and reply to command senders.
|
||||
*
|
||||
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
|
||||
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
|
||||
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
|
||||
* state. Restarting on persistent failures will most likely fail anyway, since the journal
|
||||
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
|
||||
* it again.
|
||||
*
|
||||
* @param event event to be persisted
|
||||
* @param handler handler for each persisted `event`
|
||||
* Internal API
|
||||
*/
|
||||
def persist[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||
@InternalApi
|
||||
final private[akka] def internalPersist[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||
if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.")
|
||||
pendingStashingPersistInvocations += 1
|
||||
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
||||
|
|
@ -325,14 +305,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
|
|||
}
|
||||
|
||||
/**
|
||||
* Asynchronously persists `events` in specified order. This is equivalent to calling
|
||||
* `persist[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
|
||||
* except that `events` are persisted atomically with this method.
|
||||
*
|
||||
* @param events events to be persisted
|
||||
* @param handler handler for each persisted `events`
|
||||
* Internal API
|
||||
*/
|
||||
def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
|
||||
@InternalApi
|
||||
final private[akka] def internalPersistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
|
||||
if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.")
|
||||
if (events.nonEmpty) {
|
||||
events.foreach { event ⇒
|
||||
|
|
@ -345,29 +321,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
|
|||
}
|
||||
|
||||
/**
|
||||
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
|
||||
* persisted event.
|
||||
*
|
||||
* Unlike `persist` the persistent actor will continue to receive incoming commands between the
|
||||
* call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of
|
||||
* of persist should be used when you favor throughput over the "command-2 only processed after
|
||||
* command-1 effects' have been applied" guarantee, which is provided by the plain `persist` method.
|
||||
*
|
||||
* An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
|
||||
* event is the sender of the corresponding command. This means that one can reply to a command
|
||||
* sender within an event `handler`.
|
||||
*
|
||||
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
|
||||
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
|
||||
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
|
||||
* state. Restarting on persistent failures will most likely fail anyway, since the journal
|
||||
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
|
||||
* it again.
|
||||
*
|
||||
* @param event event to be persisted
|
||||
* @param handler handler for each persisted `event`
|
||||
* Internal API
|
||||
*/
|
||||
def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||
@InternalApi
|
||||
final private[akka] def internalPersistAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||
if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.")
|
||||
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
||||
eventBatch ::= AtomicWrite(PersistentRepr(event, persistenceId = persistenceId,
|
||||
|
|
@ -375,14 +332,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
|
|||
}
|
||||
|
||||
/**
|
||||
* Asynchronously persists `events` in specified order. This is equivalent to calling
|
||||
* `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
|
||||
* except that `events` are persisted atomically with this method.
|
||||
*
|
||||
* @param events events to be persisted
|
||||
* @param handler handler for each persisted `events`
|
||||
* Internal API
|
||||
*/
|
||||
def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
|
||||
@InternalApi
|
||||
final private[akka] def internalPersistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
|
||||
if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.")
|
||||
if (events.nonEmpty) {
|
||||
events.foreach { event ⇒
|
||||
|
|
@ -394,23 +347,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
|
|||
}
|
||||
|
||||
/**
|
||||
* Defer the handler execution until all pending handlers have been executed.
|
||||
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
|
||||
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before `deferAsync`,
|
||||
* the corresponding handlers will be invoked in the same order as they were registered in.
|
||||
*
|
||||
* This call will NOT result in `event` being persisted, use `persist` or `persistAsync` instead
|
||||
* if the given event should possible to replay.
|
||||
*
|
||||
* If there are no pending persist handler calls, the handler will be called immediately.
|
||||
*
|
||||
* If persistence of an earlier event fails, the persistent actor will stop, and the `handler`
|
||||
* will not be run.
|
||||
*
|
||||
* @param event event to be handled in the future, when preceding persist operations have been processes
|
||||
* @param handler handler for the given `event`
|
||||
* Internal API
|
||||
*/
|
||||
def deferAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||
@InternalApi
|
||||
final private[akka] def internalDeferAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||
if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.")
|
||||
if (pendingInvocations.isEmpty) {
|
||||
handler(event)
|
||||
|
|
|
|||
|
|
@ -8,8 +8,10 @@ import java.lang.{ Iterable ⇒ JIterable }
|
|||
import akka.actor._
|
||||
import akka.japi.Procedure
|
||||
import akka.japi.Util
|
||||
import akka.persistence.Eventsourced.{ AsyncHandlerInvocation, StashingHandlerInvocation }
|
||||
import com.typesafe.config.Config
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
abstract class RecoveryCompleted
|
||||
|
|
@ -156,10 +158,111 @@ final class DiscardConfigurator extends StashOverflowStrategyConfigurator {
|
|||
}
|
||||
|
||||
/**
|
||||
* An persistent Actor - can be used to implement command or event sourcing.
|
||||
* Scala API: A persistent Actor - can be used to implement command or event sourcing.
|
||||
*/
|
||||
trait PersistentActor extends Eventsourced with PersistenceIdentity {
|
||||
def receive = receiveCommand
|
||||
|
||||
/**
|
||||
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
|
||||
* persisted event. It is guaranteed that no new commands will be received by a persistent actor
|
||||
* between a call to `persist` and the execution of its `handler`. This also holds for
|
||||
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
|
||||
* commands and unstashing them when the `event` has been persisted and handled. The stash used
|
||||
* for that is an internal stash which doesn't interfere with the inherited user stash.
|
||||
*
|
||||
* An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
|
||||
* event is the sender of the corresponding command. This means that one can reply to a command
|
||||
* sender within an event `handler`.
|
||||
*
|
||||
* Within an event handler, applications usually update persistent actor state using persisted event
|
||||
* data, notify listeners and reply to command senders.
|
||||
*
|
||||
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
|
||||
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
|
||||
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
|
||||
* state. Restarting on persistent failures will most likely fail anyway, since the journal
|
||||
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
|
||||
* it again.
|
||||
*
|
||||
* @param event event to be persisted
|
||||
* @param handler handler for each persisted `event`
|
||||
*/
|
||||
def persist[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||
internalPersist(event)(handler)
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronously persists `events` in specified order. This is equivalent to calling
|
||||
* `persist[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
|
||||
* except that `events` are persisted atomically with this method.
|
||||
*
|
||||
* @param events events to be persisted
|
||||
* @param handler handler for each persisted `events`
|
||||
*/
|
||||
def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
|
||||
internalPersistAll(events)(handler)
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
|
||||
* persisted event.
|
||||
*
|
||||
* Unlike `persist` the persistent actor will continue to receive incoming commands between the
|
||||
* call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of
|
||||
* of persist should be used when you favor throughput over the "command-2 only processed after
|
||||
* command-1 effects' have been applied" guarantee, which is provided by the plain `persist` method.
|
||||
*
|
||||
* An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
|
||||
* event is the sender of the corresponding command. This means that one can reply to a command
|
||||
* sender within an event `handler`.
|
||||
*
|
||||
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
|
||||
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
|
||||
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
|
||||
* state. Restarting on persistent failures will most likely fail anyway, since the journal
|
||||
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
|
||||
* it again.
|
||||
*
|
||||
* @param event event to be persisted
|
||||
* @param handler handler for each persisted `event`
|
||||
*/
|
||||
def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||
internalPersistAsync(event)(handler)
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronously persists `events` in specified order. This is equivalent to calling
|
||||
* `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
|
||||
* except that `events` are persisted atomically with this method.
|
||||
*
|
||||
* @param events events to be persisted
|
||||
* @param handler handler for each persisted `events`
|
||||
*/
|
||||
def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
|
||||
internalPersistAllAsync(events)(handler)
|
||||
}
|
||||
|
||||
/**
|
||||
* Defer the handler execution until all pending handlers have been executed.
|
||||
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
|
||||
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before `deferAsync`,
|
||||
* the corresponding handlers will be invoked in the same order as they were registered in.
|
||||
*
|
||||
* This call will NOT result in `event` being persisted, use `persist` or `persistAsync` instead
|
||||
* if the given event should possible to replay.
|
||||
*
|
||||
* If there are no pending persist handler calls, the handler will be called immediately.
|
||||
*
|
||||
* If persistence of an earlier event fails, the persistent actor will stop, and the `handler`
|
||||
* will not be run.
|
||||
*
|
||||
* @param event event to be handled in the future, when preceding persist operations have been processes
|
||||
* @param handler handler for the given `event`
|
||||
*/
|
||||
def deferAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||
internalDeferAsync(event)(handler)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -204,7 +307,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
|
|||
* @param handler handler for each persisted `event`
|
||||
*/
|
||||
def persist[A](event: A, handler: Procedure[A]): Unit =
|
||||
persist(event)(event ⇒ handler(event))
|
||||
internalPersist(event)(event ⇒ handler(event))
|
||||
|
||||
/**
|
||||
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
|
||||
|
|
@ -215,7 +318,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
|
|||
* @param handler handler for each persisted `events`
|
||||
*/
|
||||
def persistAll[A](events: JIterable[A], handler: Procedure[A]): Unit =
|
||||
persistAll(Util.immutableSeq(events))(event ⇒ handler(event))
|
||||
internalPersistAll(Util.immutableSeq(events))(event ⇒ handler(event))
|
||||
|
||||
/**
|
||||
* JAVA API: asynchronously persists `event`. On successful persistence, `handler` is called with the
|
||||
|
|
@ -241,7 +344,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
|
|||
* @param handler handler for each persisted `event`
|
||||
*/
|
||||
def persistAsync[A](event: A)(handler: Procedure[A]): Unit =
|
||||
super[Eventsourced].persistAsync(event)(event ⇒ handler(event))
|
||||
internalPersistAsync(event)(event ⇒ handler(event))
|
||||
|
||||
/**
|
||||
* JAVA API: asynchronously persists `events` in specified order. This is equivalent to calling
|
||||
|
|
@ -252,7 +355,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
|
|||
* @param handler handler for each persisted `events`
|
||||
*/
|
||||
def persistAllAsync[A](events: JIterable[A], handler: Procedure[A]): Unit =
|
||||
super[Eventsourced].persistAllAsync(Util.immutableSeq(events))(event ⇒ handler(event))
|
||||
internalPersistAllAsync(Util.immutableSeq(events))(event ⇒ handler(event))
|
||||
|
||||
/**
|
||||
* Defer the handler execution until all pending handlers have been executed.
|
||||
|
|
@ -272,7 +375,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
|
|||
* @param handler handler for the given `event`
|
||||
*/
|
||||
def deferAsync[A](event: A)(handler: Procedure[A]): Unit =
|
||||
super[Eventsourced].deferAsync(event)(event ⇒ handler(event))
|
||||
internalDeferAsync(event)(event ⇒ handler(event))
|
||||
|
||||
/**
|
||||
* Java API: recovery handler that receives persisted events during recovery. If a state snapshot
|
||||
|
|
@ -303,7 +406,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
|
|||
/**
|
||||
* Java API: an persistent actor - can be used to implement command or event sourcing.
|
||||
*/
|
||||
abstract class AbstractPersistentActor extends AbstractActor with PersistentActor with Eventsourced {
|
||||
abstract class AbstractPersistentActor extends AbstractActor with Eventsourced {
|
||||
|
||||
/**
|
||||
* Recovery handler that receives persisted events during recovery. If a state snapshot
|
||||
|
|
@ -360,7 +463,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
|
|||
* @param handler handler for each persisted `event`
|
||||
*/
|
||||
def persist[A](event: A, handler: Procedure[A]): Unit =
|
||||
persist(event)(event ⇒ handler(event))
|
||||
internalPersist(event)(event ⇒ handler(event))
|
||||
|
||||
/**
|
||||
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
|
||||
|
|
@ -371,7 +474,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
|
|||
* @param handler handler for each persisted `events`
|
||||
*/
|
||||
def persistAll[A](events: JIterable[A], handler: Procedure[A]): Unit =
|
||||
persistAll(Util.immutableSeq(events))(event ⇒ handler(event))
|
||||
internalPersistAll(Util.immutableSeq(events))(event ⇒ handler(event))
|
||||
|
||||
/**
|
||||
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
|
||||
|
|
@ -392,7 +495,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
|
|||
* @param handler handler for each persisted `event`
|
||||
*/
|
||||
def persistAsync[A](event: A, handler: Procedure[A]): Unit =
|
||||
persistAsync(event)(event ⇒ handler(event))
|
||||
internalPersistAsync(event)(event ⇒ handler(event))
|
||||
|
||||
/**
|
||||
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
|
||||
|
|
@ -403,7 +506,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
|
|||
* @param handler handler for each persisted `events`
|
||||
*/
|
||||
def persistAllAsync[A](events: JIterable[A], handler: Procedure[A]): Unit =
|
||||
persistAllAsync(Util.immutableSeq(events))(event ⇒ handler(event))
|
||||
internalPersistAllAsync(Util.immutableSeq(events))(event ⇒ handler(event))
|
||||
|
||||
/**
|
||||
* Defer the handler execution until all pending handlers have been executed.
|
||||
|
|
@ -423,9 +526,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
|
|||
* @param handler handler for the given `event`
|
||||
*/
|
||||
def deferAsync[A](event: A)(handler: Procedure[A]): Unit =
|
||||
super.deferAsync(event)(event ⇒ handler(event))
|
||||
|
||||
override def receive = super[PersistentActor].receive
|
||||
internalDeferAsync(event)(event ⇒ handler(event))
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -435,6 +435,32 @@ object MiMa extends AutoPlugin {
|
|||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.DurableStore#Store.this"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.LmdbDurableStore.dbPut"),
|
||||
|
||||
// #22218 Java Ambiguity in AbstractPersistentActor with Scala 2.12
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.UntypedPersistentActor.deferAsync"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.UntypedPersistentActor.persistAllAsync"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.UntypedPersistentActor.persistAll"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.Eventsourced.deferAsync"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.Eventsourced.persistAllAsync"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.Eventsourced.persistAll"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalPersistAsync"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalPersist"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalPersistAll"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalDeferAsync"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalPersistAllAsync"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.UntypedPersistentActorWithAtLeastOnceDelivery.deliver"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.UntypedPersistentActorWithAtLeastOnceDelivery.deliver"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.AtLeastOnceDeliveryLike.deliver"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.AtLeastOnceDeliveryLike.deliver"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.AtLeastOnceDeliveryLike.internalDeliver"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.AtLeastOnceDeliveryLike.internalDeliver"),
|
||||
ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery.deliver"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery.deliver"),
|
||||
ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.AbstractPersistentActor"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.AbstractPersistentActor.deferAsync"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.AbstractPersistentActor.persistAllAsync"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.AbstractPersistentActor.persistAll"),
|
||||
|
||||
// #22208 remove extension key
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.event.Logging$Extension$")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue