From 8b5018ba21cf2651cb74fa7ef2b2bc2cea175bff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 20 Feb 2017 14:50:25 +0100 Subject: [PATCH] Fix AbstractPersistentActor ambiguity with Scala 2.12 #22218 --- .../project/migration-guide-2.4.x-2.5.x.rst | 12 ++ .../persistence/AtLeastOnceDelivery.scala | 117 +++++++++------- .../scala/akka/persistence/Eventsourced.scala | 92 +++---------- .../akka/persistence/PersistentActor.scala | 129 ++++++++++++++++-- project/MiMa.scala | 26 ++++ 5 files changed, 238 insertions(+), 138 deletions(-) diff --git a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst index 7c8aa4b755..94be8aa263 100644 --- a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst +++ b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst @@ -478,6 +478,18 @@ They will receive ``Replicator.Deleted`` instead. Persistence =========== +Binary incompatibility of PersistentActor and AtLeastOneDelivery +---------------------------------------------------------------- + +To be able to evolve the Java APIs ``AbstractPersistentActor`` and ``AbstractPersistentActorWithAtLeastOnceDelivery`` +to work with Scala 2.12 we could find no other solution but to break the binary compatibility of the Scala versions +(which the Java ones were based on). + +This means that the Akka 2.5 artifact cannot be a class path drop in replacement of Akka 2.4 if you use +``PersistentActor`` or ``AtLeastOnceDelivery``, to do this upgrade you _must_ recompile your project with the new +version of Akka. + + Removal of PersistentView ------------------------- diff --git a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala index 4e58b5de01..0e6b144ac2 100644 --- a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala @@ -6,9 +6,11 @@ package akka.persistence import scala.collection.breakOut import scala.collection.immutable import scala.concurrent.duration.FiniteDuration -import akka.actor.{ ActorSelection, ActorPath, NotInfluenceReceiveTimeout } +import akka.actor.{ ActorPath, ActorSelection, NotInfluenceReceiveTimeout } import akka.persistence.serialization.Message import akka.actor.Cancellable +import akka.annotation.InternalApi +import akka.persistence.AtLeastOnceDelivery.Internal.Delivery object AtLeastOnceDelivery { @@ -73,7 +75,7 @@ object AtLeastOnceDelivery { } /** - * Mix-in this trait with your `PersistentActor` to send messages with at-least-once + * Scala API: Mix-in this trait with your `PersistentActor` to send messages with at-least-once * delivery semantics to destinations. It takes care of re-sending messages when they * have not been confirmed within a configurable timeout. Use the [[AtLeastOnceDeliveryLike#deliver]] method to * send a message to a destination. Call the [[AtLeastOnceDeliveryLike#confirmDelivery]] method when the destination @@ -104,8 +106,59 @@ object AtLeastOnceDelivery { * as a blob in your custom snapshot. * * @see [[AtLeastOnceDeliveryLike]] + * @see [[AbstractPersistentActorWithAtLeastOnceDelivery]] for Java API */ -trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike +trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike { + + /** + * Scala API: Send the message created by the `deliveryIdToMessage` function to + * the `destination` actor. It will retry sending the message until + * the delivery is confirmed with [[#confirmDelivery]]. Correlation + * between `deliver` and `confirmDelivery` is performed with the + * `deliveryId` that is provided as parameter to the `deliveryIdToMessage` + * function. The `deliveryId` is typically passed in the message to the + * destination, which replies with a message containing the same `deliveryId`. + * + * The `deliveryId` is a strictly monotonically increasing sequence number without + * gaps. The same sequence is used for all destinations of the actor, i.e. when sending + * to multiple destinations the destinations will see gaps in the sequence if no + * translation is performed. + * + * During recovery this method will not send out the message, but it will be sent + * later if no matching `confirmDelivery` was performed. + * + * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] + * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. + */ + def deliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit = { + internalDeliver(destination)(deliveryIdToMessage) + } + + /** + * Scala API: Send the message created by the `deliveryIdToMessage` function to + * the `destination` actor. It will retry sending the message until + * the delivery is confirmed with [[#confirmDelivery]]. Correlation + * between `deliver` and `confirmDelivery` is performed with the + * `deliveryId` that is provided as parameter to the `deliveryIdToMessage` + * function. The `deliveryId` is typically passed in the message to the + * destination, which replies with a message containing the same `deliveryId`. + * + * The `deliveryId` is a strictly monotonically increasing sequence number without + * gaps. The same sequence is used for all destinations of the actor, i.e. when sending + * to multiple destinations the destinations will see gaps in the sequence if no + * translation is performed. + * + * During recovery this method will not send out the message, but it will be sent + * later if no matching `confirmDelivery` was performed. + * + * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] + * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. + */ + def deliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit = { + internalDeliver(destination)(deliveryIdToMessage) + } + +} /** * @see [[AtLeastOnceDelivery]] @@ -190,26 +243,10 @@ trait AtLeastOnceDeliveryLike extends Eventsourced { } /** - * Scala API: Send the message created by the `deliveryIdToMessage` function to - * the `destination` actor. It will retry sending the message until - * the delivery is confirmed with [[#confirmDelivery]]. Correlation - * between `deliver` and `confirmDelivery` is performed with the - * `deliveryId` that is provided as parameter to the `deliveryIdToMessage` - * function. The `deliveryId` is typically passed in the message to the - * destination, which replies with a message containing the same `deliveryId`. - * - * The `deliveryId` is a strictly monotonically increasing sequence number without - * gaps. The same sequence is used for all destinations of the actor, i.e. when sending - * to multiple destinations the destinations will see gaps in the sequence if no - * translation is performed. - * - * During recovery this method will not send out the message, but it will be sent - * later if no matching `confirmDelivery` was performed. - * - * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] - * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. + * INTERNAL API */ - def deliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit = { + @InternalApi + private[akka] final def internalDeliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit = { if (unconfirmed.size >= maxUnconfirmedMessages) throw new MaxUnconfirmedMessagesExceededException( s"Too many unconfirmed messages, maximum allowed is [$maxUnconfirmedMessages]") @@ -225,31 +262,15 @@ trait AtLeastOnceDeliveryLike extends Eventsourced { } /** - * Scala API: Send the message created by the `deliveryIdToMessage` function to - * the `destination` actor. It will retry sending the message until - * the delivery is confirmed with [[#confirmDelivery]]. Correlation - * between `deliver` and `confirmDelivery` is performed with the - * `deliveryId` that is provided as parameter to the `deliveryIdToMessage` - * function. The `deliveryId` is typically passed in the message to the - * destination, which replies with a message containing the same `deliveryId`. - * - * The `deliveryId` is a strictly monotonically increasing sequence number without - * gaps. The same sequence is used for all destinations of the actor, i.e. when sending - * to multiple destinations the destinations will see gaps in the sequence if no - * translation is performed. - * - * During recovery this method will not send out the message, but it will be sent - * later if no matching `confirmDelivery` was performed. - * - * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] - * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. + * INTERNAL API */ - def deliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit = { + @InternalApi + private[akka] final def internalDeliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit = { val isWildcardSelection = destination.pathString.contains("*") require(!isWildcardSelection, "Delivering to wildcard actor selections is not supported by AtLeastOnceDelivery. " + "Introduce an mediator Actor which this AtLeastOnceDelivery Actor will deliver the messages to," + "and will handle the logic of fan-out and collecting individual confirmations, until it can signal confirmation back to this Actor.") - deliver(ActorPath.fromString(destination.toSerializationFormat))(deliveryIdToMessage) + internalDeliver(ActorPath.fromString(destination.toSerializationFormat))(deliveryIdToMessage) } /** @@ -389,7 +410,7 @@ abstract class UntypedPersistentActorWithAtLeastOnceDelivery extends UntypedPers * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. */ def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit = - super.deliver(destination)(id ⇒ deliveryIdToMessage.apply(id)) + internalDeliver(destination)(id ⇒ deliveryIdToMessage.apply(id)) /** * Java API: Send the message created by the `deliveryIdToMessage` function to @@ -412,13 +433,13 @@ abstract class UntypedPersistentActorWithAtLeastOnceDelivery extends UntypedPers * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. */ def deliver(destination: ActorSelection, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit = - super.deliver(destination)(id ⇒ deliveryIdToMessage.apply(id)) + internalDeliver(destination)(id ⇒ deliveryIdToMessage.apply(id)) } /** - * Java API compatible with lambda expressions + * Java API: compatible with lambda expressions * - * Use this class instead of `UntypedPersistentActor` to send messages + * Use this class instead of `AbstractPersistentActor` to send messages * with at-least-once delivery semantics to destinations. * Full documentation in [[AtLeastOnceDelivery]]. * @@ -447,7 +468,7 @@ abstract class AbstractPersistentActorWithAtLeastOnceDelivery extends AbstractPe * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. */ def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit = - super.deliver(destination)(id ⇒ deliveryIdToMessage.apply(id)) + internalDeliver(destination)(id ⇒ deliveryIdToMessage.apply(id)) /** * Java API: Send the message created by the `deliveryIdToMessage` function to @@ -470,5 +491,5 @@ abstract class AbstractPersistentActorWithAtLeastOnceDelivery extends AbstractPe * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. */ def deliver(destination: ActorSelection, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit = - super.deliver(destination)(id ⇒ deliveryIdToMessage.apply(id)) + internalDeliver(destination)(id ⇒ deliveryIdToMessage.apply(id)) } diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index acd231bba7..1dc5327726 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -10,6 +10,7 @@ import java.util.UUID import scala.collection.immutable import scala.util.control.NonFatal import akka.actor.{ DeadLetter, StashOverflowException } +import akka.annotation.InternalApi import akka.util.Helpers.ConfigOps import akka.event.Logging import akka.event.LoggingAdapter @@ -292,31 +293,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas def receiveCommand: Receive /** - * Asynchronously persists `event`. On successful persistence, `handler` is called with the - * persisted event. It is guaranteed that no new commands will be received by a persistent actor - * between a call to `persist` and the execution of its `handler`. This also holds for - * multiple `persist` calls per received command. Internally, this is achieved by stashing new - * commands and unstashing them when the `event` has been persisted and handled. The stash used - * for that is an internal stash which doesn't interfere with the inherited user stash. - * - * An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted - * event is the sender of the corresponding command. This means that one can reply to a command - * sender within an event `handler`. - * - * Within an event handler, applications usually update persistent actor state using persisted event - * data, notify listeners and reply to command senders. - * - * If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will - * unconditionally be stopped. The reason that it cannot resume when persist fails is that it - * is unknown if the even was actually persisted or not, and therefore it is in an inconsistent - * state. Restarting on persistent failures will most likely fail anyway, since the journal - * is probably unavailable. It is better to stop the actor and after a back-off timeout start - * it again. - * - * @param event event to be persisted - * @param handler handler for each persisted `event` + * Internal API */ - def persist[A](event: A)(handler: A ⇒ Unit): Unit = { + @InternalApi + final private[akka] def internalPersist[A](event: A)(handler: A ⇒ Unit): Unit = { if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.") pendingStashingPersistInvocations += 1 pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) @@ -325,14 +305,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas } /** - * Asynchronously persists `events` in specified order. This is equivalent to calling - * `persist[A](event: A)(handler: A => Unit)` multiple times with the same `handler`, - * except that `events` are persisted atomically with this method. - * - * @param events events to be persisted - * @param handler handler for each persisted `events` + * Internal API */ - def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { + @InternalApi + final private[akka] def internalPersistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.") if (events.nonEmpty) { events.foreach { event ⇒ @@ -345,29 +321,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas } /** - * Asynchronously persists `event`. On successful persistence, `handler` is called with the - * persisted event. - * - * Unlike `persist` the persistent actor will continue to receive incoming commands between the - * call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of - * of persist should be used when you favor throughput over the "command-2 only processed after - * command-1 effects' have been applied" guarantee, which is provided by the plain `persist` method. - * - * An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted - * event is the sender of the corresponding command. This means that one can reply to a command - * sender within an event `handler`. - * - * If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will - * unconditionally be stopped. The reason that it cannot resume when persist fails is that it - * is unknown if the even was actually persisted or not, and therefore it is in an inconsistent - * state. Restarting on persistent failures will most likely fail anyway, since the journal - * is probably unavailable. It is better to stop the actor and after a back-off timeout start - * it again. - * - * @param event event to be persisted - * @param handler handler for each persisted `event` + * Internal API */ - def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = { + @InternalApi + final private[akka] def internalPersistAsync[A](event: A)(handler: A ⇒ Unit): Unit = { if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.") pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) eventBatch ::= AtomicWrite(PersistentRepr(event, persistenceId = persistenceId, @@ -375,14 +332,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas } /** - * Asynchronously persists `events` in specified order. This is equivalent to calling - * `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`, - * except that `events` are persisted atomically with this method. - * - * @param events events to be persisted - * @param handler handler for each persisted `events` + * Internal API */ - def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { + @InternalApi + final private[akka] def internalPersistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.") if (events.nonEmpty) { events.foreach { event ⇒ @@ -394,23 +347,10 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas } /** - * Defer the handler execution until all pending handlers have been executed. - * Allows to define logic within the actor, which will respect the invocation-order-guarantee - * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before `deferAsync`, - * the corresponding handlers will be invoked in the same order as they were registered in. - * - * This call will NOT result in `event` being persisted, use `persist` or `persistAsync` instead - * if the given event should possible to replay. - * - * If there are no pending persist handler calls, the handler will be called immediately. - * - * If persistence of an earlier event fails, the persistent actor will stop, and the `handler` - * will not be run. - * - * @param event event to be handled in the future, when preceding persist operations have been processes - * @param handler handler for the given `event` + * Internal API */ - def deferAsync[A](event: A)(handler: A ⇒ Unit): Unit = { + @InternalApi + final private[akka] def internalDeferAsync[A](event: A)(handler: A ⇒ Unit): Unit = { if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.") if (pendingInvocations.isEmpty) { handler(event) diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala index 4d6a841547..830a956801 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala @@ -8,8 +8,10 @@ import java.lang.{ Iterable ⇒ JIterable } import akka.actor._ import akka.japi.Procedure import akka.japi.Util +import akka.persistence.Eventsourced.{ AsyncHandlerInvocation, StashingHandlerInvocation } import com.typesafe.config.Config +import scala.collection.immutable import scala.util.control.NoStackTrace abstract class RecoveryCompleted @@ -156,10 +158,111 @@ final class DiscardConfigurator extends StashOverflowStrategyConfigurator { } /** - * An persistent Actor - can be used to implement command or event sourcing. + * Scala API: A persistent Actor - can be used to implement command or event sourcing. */ trait PersistentActor extends Eventsourced with PersistenceIdentity { def receive = receiveCommand + + /** + * Asynchronously persists `event`. On successful persistence, `handler` is called with the + * persisted event. It is guaranteed that no new commands will be received by a persistent actor + * between a call to `persist` and the execution of its `handler`. This also holds for + * multiple `persist` calls per received command. Internally, this is achieved by stashing new + * commands and unstashing them when the `event` has been persisted and handled. The stash used + * for that is an internal stash which doesn't interfere with the inherited user stash. + * + * An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted + * event is the sender of the corresponding command. This means that one can reply to a command + * sender within an event `handler`. + * + * Within an event handler, applications usually update persistent actor state using persisted event + * data, notify listeners and reply to command senders. + * + * If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will + * unconditionally be stopped. The reason that it cannot resume when persist fails is that it + * is unknown if the even was actually persisted or not, and therefore it is in an inconsistent + * state. Restarting on persistent failures will most likely fail anyway, since the journal + * is probably unavailable. It is better to stop the actor and after a back-off timeout start + * it again. + * + * @param event event to be persisted + * @param handler handler for each persisted `event` + */ + def persist[A](event: A)(handler: A ⇒ Unit): Unit = { + internalPersist(event)(handler) + } + + /** + * Asynchronously persists `events` in specified order. This is equivalent to calling + * `persist[A](event: A)(handler: A => Unit)` multiple times with the same `handler`, + * except that `events` are persisted atomically with this method. + * + * @param events events to be persisted + * @param handler handler for each persisted `events` + */ + def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { + internalPersistAll(events)(handler) + } + + /** + * Asynchronously persists `event`. On successful persistence, `handler` is called with the + * persisted event. + * + * Unlike `persist` the persistent actor will continue to receive incoming commands between the + * call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of + * of persist should be used when you favor throughput over the "command-2 only processed after + * command-1 effects' have been applied" guarantee, which is provided by the plain `persist` method. + * + * An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted + * event is the sender of the corresponding command. This means that one can reply to a command + * sender within an event `handler`. + * + * If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will + * unconditionally be stopped. The reason that it cannot resume when persist fails is that it + * is unknown if the even was actually persisted or not, and therefore it is in an inconsistent + * state. Restarting on persistent failures will most likely fail anyway, since the journal + * is probably unavailable. It is better to stop the actor and after a back-off timeout start + * it again. + * + * @param event event to be persisted + * @param handler handler for each persisted `event` + */ + def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = { + internalPersistAsync(event)(handler) + } + + /** + * Asynchronously persists `events` in specified order. This is equivalent to calling + * `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`, + * except that `events` are persisted atomically with this method. + * + * @param events events to be persisted + * @param handler handler for each persisted `events` + */ + def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { + internalPersistAllAsync(events)(handler) + } + + /** + * Defer the handler execution until all pending handlers have been executed. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before `deferAsync`, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, use `persist` or `persistAsync` instead + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediately. + * + * If persistence of an earlier event fails, the persistent actor will stop, and the `handler` + * will not be run. + * + * @param event event to be handled in the future, when preceding persist operations have been processes + * @param handler handler for the given `event` + */ + def deferAsync[A](event: A)(handler: A ⇒ Unit): Unit = { + internalDeferAsync(event)(handler) + } } /** @@ -204,7 +307,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit * @param handler handler for each persisted `event` */ def persist[A](event: A, handler: Procedure[A]): Unit = - persist(event)(event ⇒ handler(event)) + internalPersist(event)(event ⇒ handler(event)) /** * Java API: asynchronously persists `events` in specified order. This is equivalent to calling @@ -215,7 +318,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit * @param handler handler for each persisted `events` */ def persistAll[A](events: JIterable[A], handler: Procedure[A]): Unit = - persistAll(Util.immutableSeq(events))(event ⇒ handler(event)) + internalPersistAll(Util.immutableSeq(events))(event ⇒ handler(event)) /** * JAVA API: asynchronously persists `event`. On successful persistence, `handler` is called with the @@ -241,7 +344,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit * @param handler handler for each persisted `event` */ def persistAsync[A](event: A)(handler: Procedure[A]): Unit = - super[Eventsourced].persistAsync(event)(event ⇒ handler(event)) + internalPersistAsync(event)(event ⇒ handler(event)) /** * JAVA API: asynchronously persists `events` in specified order. This is equivalent to calling @@ -252,7 +355,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit * @param handler handler for each persisted `events` */ def persistAllAsync[A](events: JIterable[A], handler: Procedure[A]): Unit = - super[Eventsourced].persistAllAsync(Util.immutableSeq(events))(event ⇒ handler(event)) + internalPersistAllAsync(Util.immutableSeq(events))(event ⇒ handler(event)) /** * Defer the handler execution until all pending handlers have been executed. @@ -272,7 +375,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit * @param handler handler for the given `event` */ def deferAsync[A](event: A)(handler: Procedure[A]): Unit = - super[Eventsourced].deferAsync(event)(event ⇒ handler(event)) + internalDeferAsync(event)(event ⇒ handler(event)) /** * Java API: recovery handler that receives persisted events during recovery. If a state snapshot @@ -303,7 +406,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit /** * Java API: an persistent actor - can be used to implement command or event sourcing. */ -abstract class AbstractPersistentActor extends AbstractActor with PersistentActor with Eventsourced { +abstract class AbstractPersistentActor extends AbstractActor with Eventsourced { /** * Recovery handler that receives persisted events during recovery. If a state snapshot @@ -360,7 +463,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo * @param handler handler for each persisted `event` */ def persist[A](event: A, handler: Procedure[A]): Unit = - persist(event)(event ⇒ handler(event)) + internalPersist(event)(event ⇒ handler(event)) /** * Java API: asynchronously persists `events` in specified order. This is equivalent to calling @@ -371,7 +474,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo * @param handler handler for each persisted `events` */ def persistAll[A](events: JIterable[A], handler: Procedure[A]): Unit = - persistAll(Util.immutableSeq(events))(event ⇒ handler(event)) + internalPersistAll(Util.immutableSeq(events))(event ⇒ handler(event)) /** * Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the @@ -392,7 +495,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo * @param handler handler for each persisted `event` */ def persistAsync[A](event: A, handler: Procedure[A]): Unit = - persistAsync(event)(event ⇒ handler(event)) + internalPersistAsync(event)(event ⇒ handler(event)) /** * Java API: asynchronously persists `events` in specified order. This is equivalent to calling @@ -403,7 +506,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo * @param handler handler for each persisted `events` */ def persistAllAsync[A](events: JIterable[A], handler: Procedure[A]): Unit = - persistAllAsync(Util.immutableSeq(events))(event ⇒ handler(event)) + internalPersistAllAsync(Util.immutableSeq(events))(event ⇒ handler(event)) /** * Defer the handler execution until all pending handlers have been executed. @@ -423,9 +526,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo * @param handler handler for the given `event` */ def deferAsync[A](event: A)(handler: Procedure[A]): Unit = - super.deferAsync(event)(event ⇒ handler(event)) - - override def receive = super[PersistentActor].receive + internalDeferAsync(event)(event ⇒ handler(event)) } diff --git a/project/MiMa.scala b/project/MiMa.scala index d43ad08dd2..65b6487ed1 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -435,6 +435,32 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.DurableStore#Store.this"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.LmdbDurableStore.dbPut"), + // #22218 Java Ambiguity in AbstractPersistentActor with Scala 2.12 + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.UntypedPersistentActor.deferAsync"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.UntypedPersistentActor.persistAllAsync"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.UntypedPersistentActor.persistAll"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.Eventsourced.deferAsync"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.Eventsourced.persistAllAsync"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.Eventsourced.persistAll"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalPersistAsync"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalPersist"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalPersistAll"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalDeferAsync"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalPersistAllAsync"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.UntypedPersistentActorWithAtLeastOnceDelivery.deliver"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.UntypedPersistentActorWithAtLeastOnceDelivery.deliver"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.AtLeastOnceDeliveryLike.deliver"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.AtLeastOnceDeliveryLike.deliver"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.AtLeastOnceDeliveryLike.internalDeliver"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.AtLeastOnceDeliveryLike.internalDeliver"), + ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery.deliver"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery.deliver"), + ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.AbstractPersistentActor"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.AbstractPersistentActor.deferAsync"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.AbstractPersistentActor.persistAllAsync"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.AbstractPersistentActor.persistAll"), + // #22208 remove extension key ProblemFilters.exclude[MissingClassProblem]("akka.event.Logging$Extension$")