diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala index b8f38816b7..e88ecf9b57 100644 --- a/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala @@ -86,7 +86,7 @@ class `persistAsync, defer`(respondAfter: Int) extends PersistentActor { override def receiveCommand = { case n: Int => persistAsync(Evt(n)) { e => } - defer(Evt(n)) { e => if (e.i == respondAfter) sender() ! e.i } + deferAsync(Evt(n)) { e => if (e.i == respondAfter) sender() ! e.i } } override def receiveRecover = { case _ => // do nothing @@ -99,7 +99,7 @@ class `persistAsync, defer, respond ASAP`(respondAfter: Int) extends PersistentA override def receiveCommand = { case n: Int => persistAsync(Evt(n)) { e => } - defer(Evt(n)) { e => } + deferAsync(Evt(n)) { e => } if (n == respondAfter) sender() ! n } override def receiveRecover = { diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index d34c699513..e70f00b610 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -13,13 +13,9 @@ import akka.japi.Function; import akka.japi.Procedure; import akka.persistence.*; import scala.Option; -import scala.concurrent.duration.Duration; + import java.io.Serializable; -import java.util.concurrent.TimeUnit; - -import static java.util.Arrays.asList; - public class PersistenceDocTest { public interface SomeOtherMessage {} @@ -367,7 +363,7 @@ public class PersistenceDocTest { persistAsync(String.format("evt-%s-1", msg), replyToSender); persistAsync(String.format("evt-%s-2", msg), replyToSender); - defer(String.format("evt-%s-3", msg), replyToSender); + deferAsync(String.format("evt-%s-3", msg), replyToSender); } } //#defer diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index bc89356d5f..ab67803bb2 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -1,8 +1,8 @@ .. _migration-2.4: -################################ - Migration Guide 2.3.x to 2.4.x -################################ +############################## +Migration Guide 2.3.x to 2.4.x +############################## The 2.4 release contains some structural changes that require some simple, mechanical source-level changes in client code. @@ -160,8 +160,11 @@ Default interval for TestKit.awaitAssert changed to 100 ms Default check interval changed from 800 ms to 100 ms. You can define the interval explicitly if you need a longer interval. -persistenceId -============= +Akka Persistence +================ + +Mendatory persistenceId +----------------------- It is now mandatory to define the ``persistenceId`` in subclasses of ``PersistentActor``, ``UntypedPersistentActor`` and ``AbstractPersistentId``. diff --git a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst index 3cc94fd4f0..a06a4d488a 100644 --- a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst @@ -8,6 +8,14 @@ Migration Guide Akka Persistence (experimental) 2.3.3 to 2.3.4 (and 2.4.x) is provided for Persistence while under the *experimental* flag. The goal of this phase is to gather user feedback before we freeze the APIs in a major release. + +defer renamed to deferAsync +=========================== +The ``defer`` method in ``PersistentActor`` was renamed to ``deferAsync`` as it matches the semantics +of ``persistAsync`` more closely than ``persist``, which was causing confusion for users. + +Its semantics remain unchanged. + Renamed EventsourcedProcessor to PersistentActor ================================================ ``EventsourcedProcessor`` is now deprecated and replaced by ``PersistentActor`` which provides the same (and more) API. diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 432eaa9134..c4fe4ff5bf 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -240,7 +240,7 @@ object PersistenceDocSpec { sender() ! c persistAsync(s"evt-$c-1") { e => sender() ! e } persistAsync(s"evt-$c-2") { e => sender() ! e } - defer(s"evt-$c-3") { e => sender() ! e } + deferAsync(s"evt-$c-3") { e => sender() ! e } } } } diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 54c8dc3bab..f5e79be932 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -296,10 +296,10 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas /** * Defer the handler execution until all pending handlers have been executed. * Allows to define logic within the actor, which will respect the invocation-order-guarantee - * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, + * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before `deferAsync`, * the corresponding handlers will be invoked in the same order as they were registered in. * - * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, + * This call will NOT result in `event` being persisted, use `persist` or `persistAsync` instead * if the given event should possible to replay. * * If there are no pending persist handler calls, the handler will be called immediately. @@ -312,7 +312,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * @param event event to be handled in the future, when preceding persist operations have been processes * @param handler handler for the given `event` */ - final def defer[A](event: A)(handler: A ⇒ Unit): Unit = { + final def deferAsync[A](event: A)(handler: A ⇒ Unit): Unit = { if (pendingInvocations.isEmpty) { handler(event) } else { @@ -321,28 +321,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas } } - /** - * Defer the handler execution until all pending handlers have been executed. - * Allows to define logic within the actor, which will respect the invocation-order-guarantee - * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, - * the corresponding handlers will be invoked in the same order as they were registered in. - * - * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, - * if the given event should possible to replay. - * - * If there are no pending persist handler calls, the handler will be called immediately. - * - * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the - * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. - * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor by - * throwing ActorKilledException, thus the handlers will not be run. - * - * @param events event to be handled in the future, when preceding persist operations have been processes - * @param handler handler for each `event` - */ - final def defer[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = - events.foreach(defer(_)(handler)) - /** * Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`. * diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala index c145ebfa56..76c0d3e56b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala @@ -222,30 +222,8 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit * @param event event to be handled in the future, when preceding persist operations have been processes * @param handler handler for the given `event` */ - final def defer[A](event: A)(handler: Procedure[A]): Unit = - super[Eventsourced].defer(event)(event ⇒ handler(event)) - - /** - * Defer the handler execution until all pending handlers have been executed. - * Allows to define logic within the actor, which will respect the invocation-order-guarantee - * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, - * the corresponding handlers will be invoked in the same order as they were registered in. - * - * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, - * if the given event should possible to replay. - * - * If there are no pending persist handler calls, the handler will be called immediately. - * - * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the - * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. - * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers - * will not be run. - * - * @param events event to be handled in the future, when preceding persist operations have been processes - * @param handler handler for each `event` - */ - final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit = - super[Eventsourced].defer(Util.immutableSeq(events))(event ⇒ handler(event)) + final def deferAsync[A](event: A)(handler: Procedure[A]): Unit = + super[Eventsourced].deferAsync(event)(event ⇒ handler(event)) /** * Java API: recovery handler that receives persisted events during recovery. If a state snapshot @@ -330,6 +308,17 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo final def persistAsync[A](event: A, handler: Procedure[A]): Unit = persistAsync(event)(event ⇒ handler(event)) + /** + * Java API: asynchronously persists `events` in specified order. This is equivalent to calling + * `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`, + * except that `events` are persisted atomically with this method. + * + * @param events events to be persisted + * @param handler handler for each persisted `events` + */ + final def persistAsync[A](events: JIterable[A], handler: Procedure[A]): Unit = + persistAsync(Util.immutableSeq(events))(event ⇒ handler(event)) + /** * Defer the handler execution until all pending handlers have been executed. * Allows to define logic within the actor, which will respect the invocation-order-guarantee @@ -349,41 +338,8 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo * @param event event to be handled in the future, when preceding persist operations have been processes * @param handler handler for the given `event` */ - final def defer[A](event: A)(handler: Procedure[A]): Unit = - super.defer(event)(event ⇒ handler(event)) - - /** - * Defer the handler execution until all pending handlers have been executed. - * Allows to define logic within the actor, which will respect the invocation-order-guarantee - * in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer, - * the corresponding handlers will be invoked in the same order as they were registered in. - * - * This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`, - * if the given event should possible to replay. - * - * If there are no pending persist handler calls, the handler will be called immediately. - * - * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the - * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. - * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers - * will not be run. - * - * @param events event to be handled in the future, when preceding persist operations have been processes - * @param handler handler for each `event` - */ - final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit = - super.defer(Util.immutableSeq(events))(event ⇒ handler(event)) - - /** - * Java API: asynchronously persists `events` in specified order. This is equivalent to calling - * `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`, - * except that `events` are persisted atomically with this method. - * - * @param events events to be persisted - * @param handler handler for each persisted `events` - */ - final def persistAsync[A](events: JIterable[A], handler: Procedure[A]): Unit = - persistAsync(Util.immutableSeq(events))(event ⇒ handler(event)) + final def deferAsync[A](event: A)(handler: Procedure[A]): Unit = + super.deferAsync(event)(event ⇒ handler(event)) override def receive = super[PersistentActor].receive diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala index 58e13f0c1a..e39018555d 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -46,7 +46,7 @@ object PerformanceSpec { } val controlBehavior: Receive = { - case StopMeasure ⇒ defer(StopMeasure)(_ ⇒ sender() ! StopMeasure) + case StopMeasure ⇒ deferAsync(StopMeasure)(_ ⇒ sender() ! StopMeasure) case FailAt(sequenceNr) ⇒ failAt = sequenceNr } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 8423a051a9..586c5ab6c5 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -4,18 +4,17 @@ package akka.persistence -import scala.collection.immutable.Seq -import scala.concurrent.duration._ -import com.typesafe.config.Config -import akka.actor._ -import akka.testkit.{ ImplicitSender, AkkaSpec } -import akka.testkit.EventFilter -import akka.testkit.TestProbe import java.util.concurrent.atomic.AtomicInteger + +import akka.actor._ +import akka.testkit.{ AkkaSpec, ImplicitSender, TestLatch, TestProbe } +import com.typesafe.config.Config + +import scala.collection.immutable.Seq +import scala.concurrent.Await +import scala.concurrent.duration._ import scala.util.Random import scala.util.control.NoStackTrace -import akka.testkit.TestLatch -import scala.concurrent.Await object PersistentActorSpec { final case class Cmd(data: Any) @@ -370,19 +369,19 @@ object PersistentActorSpec { class DeferringWithPersistActor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { case Cmd(data) ⇒ - defer("d-1") { sender() ! _ } + deferAsync("d-1") { sender() ! _ } persist(s"$data-2") { sender() ! _ } - defer("d-3") { sender() ! _ } - defer("d-4") { sender() ! _ } + deferAsync("d-3") { sender() ! _ } + deferAsync("d-4") { sender() ! _ } } } class DeferringWithAsyncPersistActor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { case Cmd(data) ⇒ - defer(s"d-$data-1") { sender() ! _ } + deferAsync(s"d-$data-1") { sender() ! _ } persistAsync(s"pa-$data-2") { sender() ! _ } - defer(s"d-$data-3") { sender() ! _ } - defer(s"d-$data-4") { sender() ! _ } + deferAsync(s"d-$data-3") { sender() ! _ } + deferAsync(s"d-$data-4") { sender() ! _ } } } class DeferringMixedCallsPPADDPADPersistActor(name: String) extends ExamplePersistentActor(name) { @@ -390,18 +389,18 @@ object PersistentActorSpec { case Cmd(data) ⇒ persist(s"p-$data-1") { sender() ! _ } persistAsync(s"pa-$data-2") { sender() ! _ } - defer(s"d-$data-3") { sender() ! _ } - defer(s"d-$data-4") { sender() ! _ } + deferAsync(s"d-$data-3") { sender() ! _ } + deferAsync(s"d-$data-4") { sender() ! _ } persistAsync(s"pa-$data-5") { sender() ! _ } - defer(s"d-$data-6") { sender() ! _ } + deferAsync(s"d-$data-6") { sender() ! _ } } } class DeferringWithNoPersistCallsPersistActor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { case Cmd(data) ⇒ - defer("d-1") { sender() ! _ } - defer("d-2") { sender() ! _ } - defer("d-3") { sender() ! _ } + deferAsync("d-1") { sender() ! _ } + deferAsync("d-2") { sender() ! _ } + deferAsync("d-3") { sender() ! _ } } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java index 3e78fbb977..4f3a19d263 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java @@ -13,14 +13,9 @@ import akka.japi.pf.ReceiveBuilder; import akka.persistence.*; import scala.Option; import scala.PartialFunction; -import scala.concurrent.duration.Duration; import scala.runtime.BoxedUnit; import java.io.Serializable; -import java.util.concurrent.TimeUnit; - -import static java.util.Arrays.asList; - public class LambdaPersistenceDocTest { public interface SomeOtherMessage {} @@ -373,7 +368,7 @@ public class LambdaPersistenceDocTest { sender().tell(e, self()); }); - defer(String.format("evt-%s-3", c), e -> { + deferAsync(String.format("evt-%s-3", c), e -> { sender().tell(e, self()); }); }