Merge pull request #17593 from ktoso/wip-deferAsync-ktoso

!per #16797 rename defer to deferAsync, remove Seq version
This commit is contained in:
Konrad Malawski 2015-05-29 15:13:00 +02:00
commit 228ace497f
10 changed files with 61 additions and 126 deletions

View file

@ -86,7 +86,7 @@ class `persistAsync, defer`(respondAfter: Int) extends PersistentActor {
override def receiveCommand = {
case n: Int =>
persistAsync(Evt(n)) { e => }
defer(Evt(n)) { e => if (e.i == respondAfter) sender() ! e.i }
deferAsync(Evt(n)) { e => if (e.i == respondAfter) sender() ! e.i }
}
override def receiveRecover = {
case _ => // do nothing
@ -99,7 +99,7 @@ class `persistAsync, defer, respond ASAP`(respondAfter: Int) extends PersistentA
override def receiveCommand = {
case n: Int =>
persistAsync(Evt(n)) { e => }
defer(Evt(n)) { e => }
deferAsync(Evt(n)) { e => }
if (n == respondAfter) sender() ! n
}
override def receiveRecover = {

View file

@ -13,13 +13,9 @@ import akka.japi.Function;
import akka.japi.Procedure;
import akka.persistence.*;
import scala.Option;
import scala.concurrent.duration.Duration;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import static java.util.Arrays.asList;
public class PersistenceDocTest {
public interface SomeOtherMessage {}
@ -367,7 +363,7 @@ public class PersistenceDocTest {
persistAsync(String.format("evt-%s-1", msg), replyToSender);
persistAsync(String.format("evt-%s-2", msg), replyToSender);
defer(String.format("evt-%s-3", msg), replyToSender);
deferAsync(String.format("evt-%s-3", msg), replyToSender);
}
}
//#defer

View file

@ -1,8 +1,8 @@
.. _migration-2.4:
################################
Migration Guide 2.3.x to 2.4.x
################################
##############################
Migration Guide 2.3.x to 2.4.x
##############################
The 2.4 release contains some structural changes that require some
simple, mechanical source-level changes in client code.
@ -160,8 +160,11 @@ Default interval for TestKit.awaitAssert changed to 100 ms
Default check interval changed from 800 ms to 100 ms. You can define the interval explicitly if you need a
longer interval.
persistenceId
=============
Akka Persistence
================
Mendatory persistenceId
-----------------------
It is now mandatory to define the ``persistenceId`` in subclasses of ``PersistentActor``, ``UntypedPersistentActor``
and ``AbstractPersistentId``.

View file

@ -8,6 +8,14 @@ Migration Guide Akka Persistence (experimental) 2.3.3 to 2.3.4 (and 2.4.x)
is provided for Persistence while under the *experimental* flag. The goal of this phase is to gather user feedback
before we freeze the APIs in a major release.
defer renamed to deferAsync
===========================
The ``defer`` method in ``PersistentActor`` was renamed to ``deferAsync`` as it matches the semantics
of ``persistAsync`` more closely than ``persist``, which was causing confusion for users.
Its semantics remain unchanged.
Renamed EventsourcedProcessor to PersistentActor
================================================
``EventsourcedProcessor`` is now deprecated and replaced by ``PersistentActor`` which provides the same (and more) API.

View file

@ -240,7 +240,7 @@ object PersistenceDocSpec {
sender() ! c
persistAsync(s"evt-$c-1") { e => sender() ! e }
persistAsync(s"evt-$c-2") { e => sender() ! e }
defer(s"evt-$c-3") { e => sender() ! e }
deferAsync(s"evt-$c-3") { e => sender() ! e }
}
}
}

View file

@ -296,10 +296,10 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before `deferAsync`,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* This call will NOT result in `event` being persisted, use `persist` or `persistAsync` instead
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediately.
@ -312,7 +312,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* @param event event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for the given `event`
*/
final def defer[A](event: A)(handler: A Unit): Unit = {
final def deferAsync[A](event: A)(handler: A Unit): Unit = {
if (pendingInvocations.isEmpty) {
handler(event)
} else {
@ -321,28 +321,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
}
}
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediately.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor by
* throwing ActorKilledException, thus the handlers will not be run.
*
* @param events event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for each `event`
*/
final def defer[A](events: immutable.Seq[A])(handler: A Unit): Unit =
events.foreach(defer(_)(handler))
/**
* Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`.
*

View file

@ -222,30 +222,8 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
* @param event event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for the given `event`
*/
final def defer[A](event: A)(handler: Procedure[A]): Unit =
super[Eventsourced].defer(event)(event handler(event))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediately.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param events event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for each `event`
*/
final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit =
super[Eventsourced].defer(Util.immutableSeq(events))(event handler(event))
final def deferAsync[A](event: A)(handler: Procedure[A]): Unit =
super[Eventsourced].deferAsync(event)(event handler(event))
/**
* Java API: recovery handler that receives persisted events during recovery. If a state snapshot
@ -330,6 +308,17 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
final def persistAsync[A](event: A, handler: Procedure[A]): Unit =
persistAsync(event)(event handler(event))
/**
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted
* @param handler handler for each persisted `events`
*/
final def persistAsync[A](events: JIterable[A], handler: Procedure[A]): Unit =
persistAsync(Util.immutableSeq(events))(event handler(event))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
@ -349,41 +338,8 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
* @param event event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for the given `event`
*/
final def defer[A](event: A)(handler: Procedure[A]): Unit =
super.defer(event)(event handler(event))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediately.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param events event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for each `event`
*/
final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit =
super.defer(Util.immutableSeq(events))(event handler(event))
/**
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted
* @param handler handler for each persisted `events`
*/
final def persistAsync[A](events: JIterable[A], handler: Procedure[A]): Unit =
persistAsync(Util.immutableSeq(events))(event handler(event))
final def deferAsync[A](event: A)(handler: Procedure[A]): Unit =
super.deferAsync(event)(event handler(event))
override def receive = super[PersistentActor].receive

View file

@ -46,7 +46,7 @@ object PerformanceSpec {
}
val controlBehavior: Receive = {
case StopMeasure defer(StopMeasure)(_ sender() ! StopMeasure)
case StopMeasure deferAsync(StopMeasure)(_ sender() ! StopMeasure)
case FailAt(sequenceNr) failAt = sequenceNr
}

View file

@ -4,18 +4,17 @@
package akka.persistence
import scala.collection.immutable.Seq
import scala.concurrent.duration._
import com.typesafe.config.Config
import akka.actor._
import akka.testkit.{ ImplicitSender, AkkaSpec }
import akka.testkit.EventFilter
import akka.testkit.TestProbe
import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import akka.testkit.{ AkkaSpec, ImplicitSender, TestLatch, TestProbe }
import com.typesafe.config.Config
import scala.collection.immutable.Seq
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random
import scala.util.control.NoStackTrace
import akka.testkit.TestLatch
import scala.concurrent.Await
object PersistentActorSpec {
final case class Cmd(data: Any)
@ -370,19 +369,19 @@ object PersistentActorSpec {
class DeferringWithPersistActor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case Cmd(data)
defer("d-1") { sender() ! _ }
deferAsync("d-1") { sender() ! _ }
persist(s"$data-2") { sender() ! _ }
defer("d-3") { sender() ! _ }
defer("d-4") { sender() ! _ }
deferAsync("d-3") { sender() ! _ }
deferAsync("d-4") { sender() ! _ }
}
}
class DeferringWithAsyncPersistActor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case Cmd(data)
defer(s"d-$data-1") { sender() ! _ }
deferAsync(s"d-$data-1") { sender() ! _ }
persistAsync(s"pa-$data-2") { sender() ! _ }
defer(s"d-$data-3") { sender() ! _ }
defer(s"d-$data-4") { sender() ! _ }
deferAsync(s"d-$data-3") { sender() ! _ }
deferAsync(s"d-$data-4") { sender() ! _ }
}
}
class DeferringMixedCallsPPADDPADPersistActor(name: String) extends ExamplePersistentActor(name) {
@ -390,18 +389,18 @@ object PersistentActorSpec {
case Cmd(data)
persist(s"p-$data-1") { sender() ! _ }
persistAsync(s"pa-$data-2") { sender() ! _ }
defer(s"d-$data-3") { sender() ! _ }
defer(s"d-$data-4") { sender() ! _ }
deferAsync(s"d-$data-3") { sender() ! _ }
deferAsync(s"d-$data-4") { sender() ! _ }
persistAsync(s"pa-$data-5") { sender() ! _ }
defer(s"d-$data-6") { sender() ! _ }
deferAsync(s"d-$data-6") { sender() ! _ }
}
}
class DeferringWithNoPersistCallsPersistActor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case Cmd(data)
defer("d-1") { sender() ! _ }
defer("d-2") { sender() ! _ }
defer("d-3") { sender() ! _ }
deferAsync("d-1") { sender() ! _ }
deferAsync("d-2") { sender() ! _ }
deferAsync("d-3") { sender() ! _ }
}
}

View file

@ -13,14 +13,9 @@ import akka.japi.pf.ReceiveBuilder;
import akka.persistence.*;
import scala.Option;
import scala.PartialFunction;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxedUnit;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import static java.util.Arrays.asList;
public class LambdaPersistenceDocTest {
public interface SomeOtherMessage {}
@ -373,7 +368,7 @@ public class LambdaPersistenceDocTest {
sender().tell(e, self());
});
defer(String.format("evt-%s-3", c), e -> {
deferAsync(String.format("evt-%s-3", c), e -> {
sender().tell(e, self());
});
}