diff --git a/akka-bench-jmh/src/main/scala/akka/actor/ActorCreationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/ActorCreationBenchmark.scala index 74e440922e..777fd0fbe6 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/ActorCreationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/ActorCreationBenchmark.scala @@ -3,9 +3,10 @@ */ package akka.actor +import scala.concurrent.duration._ import java.util.concurrent.TimeUnit - import org.openjdk.jmh.annotations._ +import scala.concurrent.Await /* regex checking: @@ -28,14 +29,14 @@ class ActorCreationBenchmark { var i = 1 def name = { - i +=1 + i += 1 "some-rather-long-actor-name-actor-" + i } @TearDown(Level.Trial) def shutdown() { - system.shutdown() - system.awaitTermination() + system.terminate() + Await.ready(system.whenTerminated, 15.seconds) } @Benchmark diff --git a/akka-bench-jmh/src/main/scala/akka/actor/ActorPathValidationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/ActorPathValidationBenchmark.scala index d1f8044a7c..65647c76a7 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/ActorPathValidationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/ActorPathValidationBenchmark.scala @@ -11,7 +11,7 @@ import org.openjdk.jmh.annotations.Fork import org.openjdk.jmh.annotations.Measurement import org.openjdk.jmh.annotations.Mode import org.openjdk.jmh.annotations.OutputTimeUnit -import org.openjdk.jmh.annotations.Scope +import org.openjdk.jmh.annotations.{ Scope => JmhScope } import org.openjdk.jmh.annotations.State import org.openjdk.jmh.annotations.Warmup @@ -24,7 +24,7 @@ import org.openjdk.jmh.annotations.Warmup [info] a.a.ActorPathValidationBenchmark.oldActor_1 thrpt 20 1.585 0.090 ops/us */ @Fork(2) -@State(Scope.Benchmark) +@State(JmhScope.Benchmark) @BenchmarkMode(Array(Mode.Throughput)) @Warmup(iterations = 5) @Measurement(iterations = 10) @@ -36,8 +36,7 @@ class ActorPathValidationBenchmark { final val ElementRegex = """(?:[-\w:@&=+,.!~*'_;]|%\p{XDigit}{2})(?:[-\w:@&=+,.!~*'$_;]|%\p{XDigit}{2})*""".r - -// @Benchmark // blows up with stack overflow, we know + // @Benchmark // blows up with stack overflow, we know def old7000: Option[List[String]] = ElementRegex.unapplySeq(s) @Benchmark diff --git a/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala index d37fcdf000..243d0940f2 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/ForkJoinActorBenchmark.scala @@ -6,10 +6,9 @@ package akka.actor import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ - import scala.concurrent.duration._ - import java.util.concurrent.TimeUnit +import scala.concurrent.Await @State(Scope.Benchmark) @BenchmarkMode(Array(Mode.Throughput)) @@ -50,8 +49,8 @@ class ForkJoinActorBenchmark { @TearDown(Level.Trial) def shutdown() { - system.shutdown() - system.awaitTermination() + system.terminate() + Await.ready(system.whenTerminated, 15.seconds) } @Benchmark @@ -105,10 +104,10 @@ object ForkJoinActorBenchmark { class Pipe(next: Option[ActorRef]) extends Actor { def receive = { case m @ `message` => - if(next.isDefined) next.get forward m - case s @ `stop` => + if (next.isDefined) next.get forward m + case s @ `stop` => context stop self - if(next.isDefined) next.get forward s + if (next.isDefined) next.get forward s } } class PingPong extends Actor { diff --git a/akka-bench-jmh/src/main/scala/akka/actor/ScheduleBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/ScheduleBenchmark.scala index 46db76ffce..ab7fff9f6c 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/ScheduleBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/ScheduleBenchmark.scala @@ -63,8 +63,8 @@ class ScheduleBenchmark { @TearDown def shutdown() { - system.shutdown() - system.awaitTermination() + system.terminate() + Await.ready(system.whenTerminated, 15.seconds) } def op(idx: Int) = if (idx == winner) promise.trySuccess(idx) else idx diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/LevelDbBatchingBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/LevelDbBatchingBenchmark.scala index 56a8ca5ff2..aaa2001036 100644 --- a/akka-bench-jmh/src/main/scala/akka/persistence/LevelDbBatchingBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/persistence/LevelDbBatchingBenchmark.scala @@ -5,10 +5,11 @@ package akka.persistence import java.io.File import java.util.concurrent.TimeUnit - +import scala.concurrent.Await +import scala.concurrent.duration._ import akka.actor._ import akka.persistence.journal.AsyncWriteTarget._ -import akka.persistence.journal.leveldb.{SharedLeveldbJournal, SharedLeveldbStore} +import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } import akka.testkit.TestProbe import org.apache.commons.io.FileUtils import org.openjdk.jmh.annotations._ @@ -38,10 +39,10 @@ class LevelDbBatchingBenchmark { var probe: TestProbe = _ var store: ActorRef = _ - val batch_1 = List.fill(1) { PersistentRepr("data", 12, "pa") } - val batch_10 = List.fill(10) { PersistentRepr("data", 12, "pa") } - val batch_100 = List.fill(100) { PersistentRepr("data", 12, "pa") } - val batch_200 = List.fill(200) { PersistentRepr("data", 12, "pa") } + val batch_1 = List.fill(1) { AtomicWrite(PersistentRepr("data", 12, "pa")) } + val batch_10 = List.fill(10) { AtomicWrite(PersistentRepr("data", 12, "pa")) } + val batch_100 = List.fill(100) { AtomicWrite(PersistentRepr("data", 12, "pa")) } + val batch_200 = List.fill(200) { AtomicWrite(PersistentRepr("data", 12, "pa")) } @Setup(Level.Trial) def setup() { @@ -49,7 +50,7 @@ class LevelDbBatchingBenchmark { deleteStorage(sys) SharedLeveldbJournal.setStore(store, sys) - probe = TestProbe()(sys) + probe = TestProbe()(sys) store = sys.actorOf(Props[SharedLeveldbStore], "store") } @@ -58,8 +59,8 @@ class LevelDbBatchingBenchmark { store ! PoisonPill Thread.sleep(500) - sys.shutdown() - sys.awaitTermination() + sys.terminate() + Await.ready(sys.whenTerminated, 10.seconds) } @Benchmark diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala index e88ecf9b57..6d1a384eec 100644 --- a/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistenceActorDeferBenchmark.scala @@ -3,6 +3,7 @@ */ package akka.persistence +import scala.concurrent.duration._ import org.openjdk.jmh.annotations._ import org.openjdk.jmh._ import com.typesafe.config.ConfigFactory @@ -11,6 +12,7 @@ import akka.testkit.TestProbe import java.io.File import org.apache.commons.io.FileUtils import org.openjdk.jmh.annotations.Scope +import scala.concurrent.Await /* # OS: OSX 10.9.3 @@ -55,8 +57,8 @@ class PersistentActorDeferBenchmark { @TearDown def shutdown() { - system.shutdown() - system.awaitTermination() + system.terminate() + Await.ready(system.whenTerminated, 15.seconds) storageLocations.foreach(FileUtils.deleteDirectory) } diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala index 1825377638..ce95c367cd 100644 --- a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala @@ -3,6 +3,7 @@ */ package akka.persistence +import scala.concurrent.duration._ import org.openjdk.jmh.annotations._ import org.openjdk.jmh._ import com.typesafe.config.ConfigFactory @@ -11,6 +12,7 @@ import akka.testkit.TestProbe import java.io.File import org.apache.commons.io.FileUtils import org.openjdk.jmh.annotations.Scope +import scala.concurrent.Await @State(Scope.Benchmark) @BenchmarkMode(Array(Mode.Throughput)) @@ -53,8 +55,8 @@ class PersistentActorThroughputBenchmark { @TearDown def shutdown() { - system.shutdown() - system.awaitTermination() + system.terminate() + Await.ready(system.whenTerminated, 15.seconds) storageLocations.foreach(FileUtils.deleteDirectory) } diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala index a3f6dc600b..a7920cbf7e 100644 --- a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala @@ -3,15 +3,15 @@ */ package akka.persistence - +import scala.concurrent.duration._ import org.openjdk.jmh.annotations._ import akka.actor._ import akka.testkit.TestProbe import java.io.File import org.apache.commons.io.FileUtils import org.openjdk.jmh.annotations.Scope - import scala.concurrent.duration._ +import scala.concurrent.Await @State(Scope.Benchmark) @BenchmarkMode(Array(Mode.Throughput)) @@ -52,8 +52,8 @@ class PersistentActorWithAtLeastOnceDeliveryBenchmark { @TearDown def shutdown() { - system.shutdown() - system.awaitTermination() + system.terminate() + Await.ready(system.whenTerminated, 15.seconds) storageLocations.foreach(FileUtils.deleteDirectory) } @@ -63,7 +63,7 @@ class PersistentActorWithAtLeastOnceDeliveryBenchmark { def persistentActor_persistAsync_with_AtLeastOnceDelivery() { for (i <- 1 to dataCount) persistAsyncPersistentActorWithAtLeastOnceDelivery.tell(i, probe.ref) - probe.expectMsg(20 seconds, Evt(dataCount)) + probe.expectMsg(20.seconds, Evt(dataCount)) } @Benchmark @@ -71,7 +71,7 @@ class PersistentActorWithAtLeastOnceDeliveryBenchmark { def persistentActor_persist_with_AtLeastOnceDelivery() { for (i <- 1 to dataCount) persistPersistentActorWithAtLeastOnceDelivery.tell(i, probe.ref) - probe.expectMsg(2 minutes, Evt(dataCount)) + probe.expectMsg(2.minutes, Evt(dataCount)) } @Benchmark @@ -79,7 +79,7 @@ class PersistentActorWithAtLeastOnceDeliveryBenchmark { def persistentActor_noPersist_with_AtLeastOnceDelivery() { for (i <- 1 to dataCount) noPersistPersistentActorWithAtLeastOnceDelivery.tell(i, probe.ref) - probe.expectMsg(20 seconds, Evt(dataCount)) + probe.expectMsg(20.seconds, Evt(dataCount)) } } @@ -94,7 +94,7 @@ class NoPersistPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val upS deliver(downStream, deliveryId => Msg(deliveryId, n)) if (n == respondAfter) - //switch to wait all message confirmed + //switch to wait all message confirmed context.become(waitConfirm) case Confirm(deliveryId) => confirmDelivery(deliveryId) @@ -128,9 +128,9 @@ class PersistPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val upStr deliver(downStream, deliveryId => Msg(deliveryId, n)) if (n == respondAfter) - //switch to wait all message confirmed + //switch to wait all message confirmed context.become(waitConfirm) - } + } case Confirm(deliveryId) => confirmDelivery(deliveryId) case _ => // do nothing @@ -163,7 +163,7 @@ class PersistAsyncPersistentActorWithAtLeastOnceDelivery(respondAfter: Int, val deliver(downStream, deliveryId => Msg(deliveryId, n)) if (n == respondAfter) - //switch to wait all message confirmed + //switch to wait all message confirmed context.become(waitConfirm) } case Confirm(deliveryId) => diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index 107006f849..5d4d57aeef 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -8,7 +8,7 @@ import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.Optional; - +import java.util.function.Consumer; import akka.actor.*; import akka.dispatch.Futures; import com.typesafe.config.Config; @@ -19,8 +19,6 @@ import akka.persistence.japi.snapshot.JavaSnapshotStoreSpec; import akka.persistence.journal.leveldb.SharedLeveldbJournal; import akka.persistence.journal.leveldb.SharedLeveldbStore; import scala.concurrent.Future; -import akka.japi.Option; -import akka.japi.Procedure; //#plugin-imports import akka.persistence.*; @@ -87,7 +85,7 @@ public class PersistencePluginDocTest { class MyAsyncJournal extends AsyncWriteJournal { @Override - public Future doAsyncWriteMessages(Iterable messages) { + public Future>> doAsyncWriteMessages(Iterable messages) { return null; } @@ -98,7 +96,7 @@ public class PersistencePluginDocTest { @Override public Future doAsyncReplayMessages(String persistenceId, long fromSequenceNr, - long toSequenceNr, long max, Procedure replayCallback) { + long toSequenceNr, long max, Consumer replayCallback) { return null; } diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index cf650d0bda..cd1987df6a 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -138,13 +138,9 @@ the ``persist`` call and the execution(s) of the associated event handler. This calls in context of a single command. If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default) -and the actor will unconditionally be stopped. The reason that it cannot resume when persist fails -is that it is unknown if the even was actually persisted or not, and therefore it is in an inconsistent -state. Restarting on persistent failures will most likely fail anyway, since the journal is probably -unavailable. It is better to stop the actor and after a back-off timeout start it again. The -``akka.persistence.BackoffSupervisor`` actor is provided to support such restarts. - -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#backoff +and the actor will unconditionally be stopped. If persistence of an event is rejected before it is +stored, e.g. due to serialization error, ``onPersistRejected`` will be invoked (logging a warning +by default) and the actor continues with next message. The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples in Java with Lambdas `_. @@ -220,8 +216,8 @@ and before any other received messages. .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-completed -If there is a problem with recovering the state of the actor from the journal, the error will be logged and the -actor will be stopped. +If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure`` +is called (logging the error by default) and the actor will be stopped. Relaxed local consistency requirements and high throughput use-cases @@ -274,6 +270,40 @@ of the command for which this ``defer`` handler was called. The callback will not be invoked if the actor is restarted (or stopped) in between the call to ``defer`` and the journal has processed and confirmed all preceding writes. +Failures +-------- + +If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default) +and the actor will unconditionally be stopped. + +The reason that it cannot resume when persist fails is that it is unknown if the even was actually +persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures +will most likely fail anyway, since the journal is probably unavailable. It is better to stop the +actor and after a back-off timeout start it again. The ``akka.persistence.BackoffSupervisor`` actor +is provided to support such restarts. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#backoff + +If persistence of an event is rejected before it is stored, e.g. due to serialization error, +``onPersistRejected`` will be invoked (logging a warning by default) and the actor continues with +next message. + +If there is a problem with recovering the state of the actor from the journal when the actor is +started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. + +Atomic writes +------------- + +Each event is of course stored atomically, but it is also possible to store several events atomically by +using the ``persistAll`` or ``persistAllAsync`` method. That means that all events passed to that method +are stored or none of them are stored if there is an error. + +The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by +`persistAll`. + +Some journals may not support atomic writes of several events and they will then reject the ``persistAll`` +command, i.e. ``onPersistRejected`` is called with an exception (typically ``UnsupportedOperationException``). + Batch writes ------------ @@ -287,11 +317,6 @@ the maximum throughput dramatically. A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum. -The batches are also used internally to ensure atomic writes of events. All events that are persisted in context -of a single command are written as a single batch to the journal (even if ``persist`` is called multiple times per command). -The recovery of an ``AbstractPersistentActor`` will therefore never be done partially (with only a subset of events persisted by a -single command). - Message deletion ---------------- diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 8a1634764a..119c08b171 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -139,13 +139,9 @@ the ``persist`` call and the execution(s) of the associated event handler. This calls in context of a single command. If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default) -and the actor will unconditionally be stopped. The reason that it cannot resume when persist fails -is that it is unknown if the even was actually persisted or not, and therefore it is in an inconsistent -state. Restarting on persistent failures will most likely fail anyway, since the journal is probably -unavailable. It is better to stop the actor and after a back-off timeout start it again. The -``akka.persistence.BackoffSupervisor`` actor is provided to support such restarts. - -.. includecode:: code/docs/persistence/PersistenceDocTest.java#backoff +and the actor will unconditionally be stopped. If persistence of an event is rejected before it is +stored, e.g. due to serialization error, ``onPersistRejected`` will be invoked (logging a warning +by default) and the actor continues with next message. The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples with Java `_. @@ -222,8 +218,8 @@ and before any other received messages. .. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed -If there is a problem with recovering the state of the actor from the journal, the error will be logged and the -actor will be stopped. +If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure`` +is called (logging the error by default) and the actor will be stopped. .. _persist-async-java: @@ -277,6 +273,40 @@ of the command for which this ``defer`` handler was called. The callback will not be invoked if the actor is restarted (or stopped) in between the call to ``defer`` and the journal has processed and confirmed all preceding writes. +Failures +-------- + +If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default) +and the actor will unconditionally be stopped. + +The reason that it cannot resume when persist fails is that it is unknown if the even was actually +persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures +will most likely fail anyway, since the journal is probably unavailable. It is better to stop the +actor and after a back-off timeout start it again. The ``akka.persistence.BackoffSupervisor`` actor +is provided to support such restarts. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#backoff + +If persistence of an event is rejected before it is stored, e.g. due to serialization error, +``onPersistRejected`` will be invoked (logging a warning by default) and the actor continues with +next message. + +If there is a problem with recovering the state of the actor from the journal when the actor is +started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. + +Atomic writes +------------- + +Each event is of course stored atomically, but it is also possible to store several events atomically by +using the ``persistAll`` or ``persistAllAsync`` method. That means that all events passed to that method +are stored or none of them are stored if there is an error. + +The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by +`persistAll`. + +Some journals may not support atomic writes of several events and they will then reject the ``persistAll`` +command, i.e. ``onPersistRejected`` is called with an exception (typically ``UnsupportedOperationException``). + Batch writes ------------ @@ -290,12 +320,6 @@ the maximum throughput dramatically. A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum. -The batches are also used internally to ensure atomic writes of events. All events that are persisted in context -of a single command are written as a single batch to the journal (even if ``persist`` is called multiple times per command). -The recovery of an ``UntypedPersistentActor`` will therefore never be done partially (with only a subset of events persisted by a -single command). - - Message deletion ---------------- diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 61904ae6e9..64dab14be4 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -170,7 +170,6 @@ Default interval for TestKit.awaitAssert changed to 100 ms Default check interval changed from 800 ms to 100 ms. You can define the interval explicitly if you need a longer interval. - Secure Cookies ============== @@ -384,4 +383,43 @@ the Java 8 provided ``Optional`` type is used now. Please remember that when creating an ``java.util.Optional`` instance from a (possibly) ``null`` value you will want to use the non-throwing ``Optional.fromNullable`` method, which converts a ``null`` into a ``None`` value - which is -slightly different than its Scala counterpart (where ``Option.apply(null)`` returns ``None``). \ No newline at end of file +slightly different than its Scala counterpart (where ``Option.apply(null)`` returns ``None``). + +Atomic writes +------------- + +``asyncWriteMessages`` and ``writeMessages`` takes a ``immutable.Seq[AtomicWrite]`` parameter instead of +``immutable.Seq[PersistentRepr]``. + +Each `AtomicWrite` message contains the single ``PersistentRepr`` that corresponds to the event that was +passed to the ``persist`` method of the ``PersistentActor``, or it contains several ``PersistentRepr`` +that corresponds to the events that were passed to the ``persistAll`` method of the ``PersistentActor``. +All ``PersistentRepr`` of the `AtomicWrite` must be written to the data store atomically, i.e. all or +none must be stored. + +If the journal (data store) cannot support atomic writes of multiple events it should +reject such writes with a ``Try`` ``Failure`` with an ``UnsupportedOperationException`` +describing the issue. This limitation should also be documented by the journal plugin. + +Rejecting writes +---------------- + +``asyncWriteMessages`` and ``writeMessages`` returns a ``Future[immutable.Seq[Try[Unit]]]`` or `` +``immutable.Seq[Try[Unit]]`` respectively. + +The journal can signal that it rejects individual messages (``AtomicWrite``) by the returned +`immutable.Seq[Try[Unit]]`. The returned ``Seq`` must have as many elements as the input +``messages`` ``Seq``. Each ``Try`` element signals if the corresponding ``AtomicWrite`` +is rejected or not, with an exception describing the problem. Rejecting a message means it +was not stored, i.e. it must not be included in a later replay. Rejecting a message is +typically done before attempting to store it, e.g. because of serialization error. + +Read the API documentation of these methods for more information about the semantics of +rejections and failures. + +asyncReplayMessages Java API +---------------------------- + +The signature of `asyncReplayMessages` in the Java API changed from ``akka.japi.Procedure`` +to ``java.util.function.Consumer``. + diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index 8684590172..b25b51225d 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -4,6 +4,7 @@ package docs.persistence +import scala.collection.immutable import akka.actor.Actor.Receive import akka.actor.ActorSystem import akka.testkit.TestKit @@ -11,6 +12,7 @@ import com.typesafe.config._ import org.scalatest.WordSpec import scala.collection.immutable.Seq import scala.concurrent.Future +import scala.util.Try import scala.concurrent.duration._ //#plugin-imports @@ -125,7 +127,7 @@ trait SharedLeveldbPluginDocSpec { } class MyJournal extends AsyncWriteJournal { - def asyncWriteMessages(messages: Seq[PersistentRepr]): Future[Unit] = ??? + def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = ??? def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ??? def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 22727ee49a..598f88fbec 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -124,13 +124,9 @@ the ``persist`` call and the execution(s) of the associated event handler. This calls in context of a single command. If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default) -and the actor will unconditionally be stopped. The reason that it cannot resume when persist fails -is that it is unknown if the even was actually persisted or not, and therefore it is in an inconsistent -state. Restarting on persistent failures will most likely fail anyway, since the journal is probably -unavailable. It is better to stop the actor and after a back-off timeout start it again. The -``akka.persistence.BackoffSupervisor`` actor is provided to support such restarts. - -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#backoff +and the actor will unconditionally be stopped. If persistence of an event is rejected before it is +stored, e.g. due to serialization error, ``onPersistRejected`` will be invoked (logging a warning +by default) and the actor continues with next message. The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples with Scala `_. @@ -206,8 +202,8 @@ and before any other received messages. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed -If there is a problem with recovering the state of the actor from the journal, the error will be logged and the -actor will be stopped. +If there is a problem with recovering the state of the actor from the journal, ``onReplayFailure`` +is called (logging the error by default) and the actor will be stopped. .. _persist-async-scala: @@ -263,6 +259,40 @@ The calling side will get the responses in this (guaranteed) order: The callback will not be invoked if the actor is restarted (or stopped) in between the call to ``defer`` and the journal has processed and confirmed all preceding writes. +Failures +-------- + +If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default) +and the actor will unconditionally be stopped. + +The reason that it cannot resume when persist fails is that it is unknown if the even was actually +persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures +will most likely fail anyway, since the journal is probably unavailable. It is better to stop the +actor and after a back-off timeout start it again. The ``akka.persistence.BackoffSupervisor`` actor +is provided to support such restarts. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#backoff + +If persistence of an event is rejected before it is stored, e.g. due to serialization error, +``onPersistRejected`` will be invoked (logging a warning by default) and the actor continues with +next message. + +If there is a problem with recovering the state of the actor from the journal when the actor is +started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. + +Atomic writes +------------- + +Each event is of course stored atomically, but it is also possible to store several events atomically by +using the ``persistAll`` or ``persistAllAsync`` method. That means that all events passed to that method +are stored or none of them are stored if there is an error. + +The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by +`persistAll`. + +Some journals may not support atomic writes of several events and they will then reject the ``persistAll`` +command, i.e. ``onPersistRejected`` is called with an exception (typically ``UnsupportedOperationException``). + .. _batch-writes: Batch writes @@ -278,10 +308,6 @@ the maximum throughput dramatically. A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum. -The batches are also used internally to ensure atomic writes of events. All events that are persisted in context -of a single command are written as a single batch to the journal (even if ``persist`` is called multiple times per command). -The recovery of a ``PersistentActor`` will therefore never be done partially (with only a subset of events persisted by a -single command). Message deletion ---------------- diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index d661389dc0..28c0b9c5c3 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -42,20 +42,40 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { writeMessages(1, 5, pid, senderProbe.ref) } + /** + * Implementation may override and return false if it does not + * support atomic writes of several events, as emitted by `persistAll`. + */ + def supportsAtomicPersistAllOfSeveralEvents: Boolean = true + def journal: ActorRef = extension.journalFor(null) def replayedMessage(snr: Long, deleted: Boolean = false, confirms: Seq[String] = Nil): ReplayedMessage = ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, "", deleted, Actor.noSender)) - def writeMessages(from: Int, to: Int, pid: String, sender: ActorRef): Unit = { - val msgs = from to to map { i ⇒ PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender) } + def writeMessages(fromSnr: Int, toSnr: Int, pid: String, sender: ActorRef): Unit = { + val msgs = + if (supportsAtomicPersistAllOfSeveralEvents) + (fromSnr to toSnr).map { i ⇒ + AtomicWrite(PersistentRepr(payload = s"a-$i", sequenceNr = i, persistenceId = pid, sender = sender)) + } + else + (fromSnr to toSnr - 1).map { i ⇒ + if (i == toSnr - 1) + AtomicWrite(List( + PersistentRepr(payload = s"a-$i", sequenceNr = i, persistenceId = pid, sender = sender), + PersistentRepr(payload = s"a-${i + 1}", sequenceNr = i + 1, persistenceId = pid, sender = sender))) + else + AtomicWrite(PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender)) + } + val probe = TestProbe() journal ! WriteMessages(msgs, probe.ref, actorInstanceId) probe.expectMsg(WriteMessagesSuccessful) - from to to foreach { i ⇒ + fromSnr to toSnr foreach { i ⇒ probe.expectMsgPF() { case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`), _) ⇒ payload should be(s"a-${i}") } } } @@ -129,11 +149,9 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { sub.expectMsg(cmd) journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref, replayDeleted = true) - 1 to 5 foreach { i ⇒ - i match { - case 1 | 2 | 3 ⇒ receiverProbe.expectMsg(replayedMessage(i, deleted = true)) - case 4 | 5 ⇒ receiverProbe.expectMsg(replayedMessage(i)) - } + (1 to 5).foreach { + case i @ (1 | 2 | 3) ⇒ receiverProbe.expectMsg(replayedMessage(i, deleted = true)) + case i @ (4 | 5) ⇒ receiverProbe.expectMsg(replayedMessage(i)) } } @@ -148,5 +166,31 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { journal ! ReadHighestSequenceNr(0L, "non-existing-pid", receiverProbe.ref) receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(0)) } + + "reject non-serializable events" in { + // there is no chance that a journal could create a data representation for type of event + val notSerializableEvent = new Object { override def toString = "not serializable" } + val msgs = (6 to 8).map { i ⇒ + val event = if (i == 7) notSerializableEvent else s"b-$i" + AtomicWrite(PersistentRepr(payload = event, sequenceNr = i, persistenceId = pid, sender = Actor.noSender)) + } + + val probe = TestProbe() + journal ! WriteMessages(msgs, probe.ref, actorInstanceId) + + probe.expectMsg(WriteMessagesSuccessful) + val Pid = pid + probe.expectMsgPF() { + case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender), _) ⇒ payload should be(s"b-6") + } + probe.expectMsgPF() { + case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender), _, _) ⇒ + payload should be(notSerializableEvent) + } + probe.expectMsgPF() { + case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender), _) ⇒ payload should be(s"b-8") + } + + } } } diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java index 00b89cc40b..0292b46700 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java @@ -4,9 +4,10 @@ package akka.persistence.journal.japi; +import java.util.function.Consumer; + import scala.concurrent.Future; -import akka.japi.Procedure; import akka.persistence.PersistentRepr; interface AsyncRecoveryPlugin { @@ -34,7 +35,7 @@ interface AsyncRecoveryPlugin { * called to replay a single message. Can be called from any thread. */ Future doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max, - Procedure replayCallback); + Consumer replayCallback); /** * Java API, Plugin API: asynchronously reads the highest stored sequence diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java index b5f8ae649f..4080d53fe4 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java @@ -4,6 +4,8 @@ package akka.persistence.journal.japi; +import java.util.Optional; + import scala.concurrent.Future; import akka.persistence.*; @@ -11,11 +13,50 @@ import akka.persistence.*; interface AsyncWritePlugin { //#async-write-plugin-api /** - * Java API, Plugin API: synchronously writes a batch of persistent messages - * to the journal. The batch write must be atomic i.e. either all persistent - * messages in the batch are written or none. + * Java API, Plugin API: asynchronously writes a batch (`Iterable`) of + * persistent messages to the journal. + * + * The batch is only for performance reasons, i.e. all messages don't have to + * be written atomically. Higher throughput can typically be achieved by using + * batch inserts of many records compared inserting records one-by-one, but + * this aspect depends on the underlying data store and a journal + * implementation can implement it as efficient as possible with the + * assumption that the messages of the batch are unrelated. + * + * Each `AtomicWrite` message contains the single `PersistentRepr` that + * corresponds to the event that was passed to the `persist` method of the + * `PersistentActor`, or it contains several `PersistentRepr` that corresponds + * to the events that were passed to the `persistAll` method of the + * `PersistentActor`. All `PersistentRepr` of the `AtomicWrite` must be + * written to the data store atomically, i.e. all or none must be stored. If + * the journal (data store) cannot support atomic writes of multiple events it + * should reject such writes with an `Optional` with an + * `UnsupportedOperationException` describing the issue. This limitation + * should also be documented by the journal plugin. + * + * If there are failures when storing any of the messages in the batch the + * returned `Future` must be completed with failure. The `Future` must only be + * completed with success when all messages in the batch have been confirmed + * to be stored successfully, i.e. they will be readable, and visible, in a + * subsequent replay. If there are uncertainty about if the messages were + * stored or not the `Future` must be completed with failure. + * + * Data store connection problems must be signaled by completing the `Future` + * with failure. + * + * The journal can also signal that it rejects individual messages + * (`AtomicWrite`) by the returned + * `Iterable<Optional<Exception>>`. The returned `Iterable` must + * have as many elements as the input `messages` `Iterable`. Each `Optional` + * element signals if the corresponding `AtomicWrite` is rejected or not, with + * an exception describing the problem. Rejecting a message means it was not + * stored, i.e. it must not be included in a later replay. Rejecting a message + * is typically done before attempting to store it, e.g. because of + * serialization error. + * + * Data store connection problems must not be signaled as rejections. */ - Future doAsyncWriteMessages(Iterable messages); + Future>> doAsyncWriteMessages(Iterable messages); /** * Java API, Plugin API: synchronously deletes all persistent messages up to diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java index 87e2d2a59f..11033ef76a 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java @@ -4,17 +4,57 @@ package akka.persistence.journal.japi; +import java.util.Optional; + import akka.persistence.*; import scala.concurrent.Future; interface SyncWritePlugin { //#sync-write-plugin-api /** - * Java API, Plugin API: synchronously writes a batch of persistent messages - * to the journal. The batch write must be atomic i.e. either all persistent - * messages in the batch are written or none. + * Java API, Plugin API: asynchronously writes a batch (`Iterable`) of + * persistent messages to the journal. + * + * The batch is only for performance reasons, i.e. all messages don't have to + * be written atomically. Higher throughput can typically be achieved by using + * batch inserts of many records compared inserting records one-by-one, but + * this aspect depends on the underlying data store and a journal + * implementation can implement it as efficient as possible with the + * assumption that the messages of the batch are unrelated. + * + * Each `AtomicWrite` message contains the single `PersistentRepr` that + * corresponds to the event that was passed to the `persist` method of the + * `PersistentActor`, or it contains several `PersistentRepr` that corresponds + * to the events that were passed to the `persistAll` method of the + * `PersistentActor`. All `PersistentRepr` of the `AtomicWrite` must be + * written to the data store atomically, i.e. all or none must be stored. If + * the journal (data store) cannot support atomic writes of multiple events it + * should reject such writes with an `Optional` with an + * `UnsupportedOperationException` describing the issue. This limitation + * should also be documented by the journal plugin. + * + * If there are failures when storing any of the messages in the batch the + * method must throw an exception. The method must only return normally when + * all messages in the batch have been confirmed to be stored successfully, + * i.e. they will be readable, and visible, in a subsequent replay. If there + * are uncertainty about if the messages were stored or not the method must + * throw an exception. + * + * Data store connection problems must be signaled by throwing an exception. + * + * The journal can also signal that it rejects individual messages + * (`AtomicWrite`) by the returned + * `Iterable<Optional<Exception>>`. The returned `Iterable` must + * have as many elements as the input `messages` `Iterable`. Each `Optional` + * element signals if the corresponding `AtomicWrite` is rejected or not, with + * an exception describing the problem. Rejecting a message means it was not + * stored, i.e. it must not be included in a later replay. Rejecting a message + * is typically done before attempting to store it, e.g. because of + * serialization error. + * + * Data store connection problems must not be signaled as rejections. */ - void doWriteMessages(Iterable messages); + Iterable> doWriteMessages(Iterable messages); /** * Java API, Plugin API: synchronously deletes all persistent messages up to diff --git a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java index 03343f525d..b5d7500aa3 100644 --- a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java +++ b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java @@ -1833,6 +1833,692 @@ public final class MessageFormats { // @@protoc_insertion_point(class_scope:PersistentPayload) } + public interface AtomicWriteOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // repeated .PersistentMessage payload = 1; + /** + * repeated .PersistentMessage payload = 1; + */ + java.util.List + getPayloadList(); + /** + * repeated .PersistentMessage payload = 1; + */ + akka.persistence.serialization.MessageFormats.PersistentMessage getPayload(int index); + /** + * repeated .PersistentMessage payload = 1; + */ + int getPayloadCount(); + /** + * repeated .PersistentMessage payload = 1; + */ + java.util.List + getPayloadOrBuilderList(); + /** + * repeated .PersistentMessage payload = 1; + */ + akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder getPayloadOrBuilder( + int index); + } + /** + * Protobuf type {@code AtomicWrite} + */ + public static final class AtomicWrite extends + com.google.protobuf.GeneratedMessage + implements AtomicWriteOrBuilder { + // Use AtomicWrite.newBuilder() to construct. + private AtomicWrite(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private AtomicWrite(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final AtomicWrite defaultInstance; + public static AtomicWrite getDefaultInstance() { + return defaultInstance; + } + + public AtomicWrite getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private AtomicWrite( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) { + payload_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000001; + } + payload_.add(input.readMessage(akka.persistence.serialization.MessageFormats.PersistentMessage.PARSER, extensionRegistry)); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) { + payload_ = java.util.Collections.unmodifiableList(payload_); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.persistence.serialization.MessageFormats.internal_static_AtomicWrite_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.persistence.serialization.MessageFormats.internal_static_AtomicWrite_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.persistence.serialization.MessageFormats.AtomicWrite.class, akka.persistence.serialization.MessageFormats.AtomicWrite.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public AtomicWrite parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new AtomicWrite(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + // repeated .PersistentMessage payload = 1; + public static final int PAYLOAD_FIELD_NUMBER = 1; + private java.util.List payload_; + /** + * repeated .PersistentMessage payload = 1; + */ + public java.util.List getPayloadList() { + return payload_; + } + /** + * repeated .PersistentMessage payload = 1; + */ + public java.util.List + getPayloadOrBuilderList() { + return payload_; + } + /** + * repeated .PersistentMessage payload = 1; + */ + public int getPayloadCount() { + return payload_.size(); + } + /** + * repeated .PersistentMessage payload = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessage getPayload(int index) { + return payload_.get(index); + } + /** + * repeated .PersistentMessage payload = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder getPayloadOrBuilder( + int index) { + return payload_.get(index); + } + + private void initFields() { + payload_ = java.util.Collections.emptyList(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + for (int i = 0; i < getPayloadCount(); i++) { + if (!getPayload(i).isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + for (int i = 0; i < payload_.size(); i++) { + output.writeMessage(1, payload_.get(i)); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + for (int i = 0; i < payload_.size(); i++) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(1, payload_.get(i)); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.persistence.serialization.MessageFormats.AtomicWrite parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.persistence.serialization.MessageFormats.AtomicWrite parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.AtomicWrite parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.persistence.serialization.MessageFormats.AtomicWrite parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.AtomicWrite parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.persistence.serialization.MessageFormats.AtomicWrite parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.AtomicWrite parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.persistence.serialization.MessageFormats.AtomicWrite parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.AtomicWrite parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.persistence.serialization.MessageFormats.AtomicWrite parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.persistence.serialization.MessageFormats.AtomicWrite prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code AtomicWrite} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements akka.persistence.serialization.MessageFormats.AtomicWriteOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.persistence.serialization.MessageFormats.internal_static_AtomicWrite_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.persistence.serialization.MessageFormats.internal_static_AtomicWrite_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.persistence.serialization.MessageFormats.AtomicWrite.class, akka.persistence.serialization.MessageFormats.AtomicWrite.Builder.class); + } + + // Construct using akka.persistence.serialization.MessageFormats.AtomicWrite.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getPayloadFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + if (payloadBuilder_ == null) { + payload_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000001); + } else { + payloadBuilder_.clear(); + } + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.persistence.serialization.MessageFormats.internal_static_AtomicWrite_descriptor; + } + + public akka.persistence.serialization.MessageFormats.AtomicWrite getDefaultInstanceForType() { + return akka.persistence.serialization.MessageFormats.AtomicWrite.getDefaultInstance(); + } + + public akka.persistence.serialization.MessageFormats.AtomicWrite build() { + akka.persistence.serialization.MessageFormats.AtomicWrite result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.persistence.serialization.MessageFormats.AtomicWrite buildPartial() { + akka.persistence.serialization.MessageFormats.AtomicWrite result = new akka.persistence.serialization.MessageFormats.AtomicWrite(this); + int from_bitField0_ = bitField0_; + if (payloadBuilder_ == null) { + if (((bitField0_ & 0x00000001) == 0x00000001)) { + payload_ = java.util.Collections.unmodifiableList(payload_); + bitField0_ = (bitField0_ & ~0x00000001); + } + result.payload_ = payload_; + } else { + result.payload_ = payloadBuilder_.build(); + } + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof akka.persistence.serialization.MessageFormats.AtomicWrite) { + return mergeFrom((akka.persistence.serialization.MessageFormats.AtomicWrite)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.persistence.serialization.MessageFormats.AtomicWrite other) { + if (other == akka.persistence.serialization.MessageFormats.AtomicWrite.getDefaultInstance()) return this; + if (payloadBuilder_ == null) { + if (!other.payload_.isEmpty()) { + if (payload_.isEmpty()) { + payload_ = other.payload_; + bitField0_ = (bitField0_ & ~0x00000001); + } else { + ensurePayloadIsMutable(); + payload_.addAll(other.payload_); + } + onChanged(); + } + } else { + if (!other.payload_.isEmpty()) { + if (payloadBuilder_.isEmpty()) { + payloadBuilder_.dispose(); + payloadBuilder_ = null; + payload_ = other.payload_; + bitField0_ = (bitField0_ & ~0x00000001); + payloadBuilder_ = + com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? + getPayloadFieldBuilder() : null; + } else { + payloadBuilder_.addAllMessages(other.payload_); + } + } + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + for (int i = 0; i < getPayloadCount(); i++) { + if (!getPayload(i).isInitialized()) { + + return false; + } + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.persistence.serialization.MessageFormats.AtomicWrite parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.persistence.serialization.MessageFormats.AtomicWrite) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // repeated .PersistentMessage payload = 1; + private java.util.List payload_ = + java.util.Collections.emptyList(); + private void ensurePayloadIsMutable() { + if (!((bitField0_ & 0x00000001) == 0x00000001)) { + payload_ = new java.util.ArrayList(payload_); + bitField0_ |= 0x00000001; + } + } + + private com.google.protobuf.RepeatedFieldBuilder< + akka.persistence.serialization.MessageFormats.PersistentMessage, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder, akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder> payloadBuilder_; + + /** + * repeated .PersistentMessage payload = 1; + */ + public java.util.List getPayloadList() { + if (payloadBuilder_ == null) { + return java.util.Collections.unmodifiableList(payload_); + } else { + return payloadBuilder_.getMessageList(); + } + } + /** + * repeated .PersistentMessage payload = 1; + */ + public int getPayloadCount() { + if (payloadBuilder_ == null) { + return payload_.size(); + } else { + return payloadBuilder_.getCount(); + } + } + /** + * repeated .PersistentMessage payload = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessage getPayload(int index) { + if (payloadBuilder_ == null) { + return payload_.get(index); + } else { + return payloadBuilder_.getMessage(index); + } + } + /** + * repeated .PersistentMessage payload = 1; + */ + public Builder setPayload( + int index, akka.persistence.serialization.MessageFormats.PersistentMessage value) { + if (payloadBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensurePayloadIsMutable(); + payload_.set(index, value); + onChanged(); + } else { + payloadBuilder_.setMessage(index, value); + } + return this; + } + /** + * repeated .PersistentMessage payload = 1; + */ + public Builder setPayload( + int index, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder builderForValue) { + if (payloadBuilder_ == null) { + ensurePayloadIsMutable(); + payload_.set(index, builderForValue.build()); + onChanged(); + } else { + payloadBuilder_.setMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .PersistentMessage payload = 1; + */ + public Builder addPayload(akka.persistence.serialization.MessageFormats.PersistentMessage value) { + if (payloadBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensurePayloadIsMutable(); + payload_.add(value); + onChanged(); + } else { + payloadBuilder_.addMessage(value); + } + return this; + } + /** + * repeated .PersistentMessage payload = 1; + */ + public Builder addPayload( + int index, akka.persistence.serialization.MessageFormats.PersistentMessage value) { + if (payloadBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensurePayloadIsMutable(); + payload_.add(index, value); + onChanged(); + } else { + payloadBuilder_.addMessage(index, value); + } + return this; + } + /** + * repeated .PersistentMessage payload = 1; + */ + public Builder addPayload( + akka.persistence.serialization.MessageFormats.PersistentMessage.Builder builderForValue) { + if (payloadBuilder_ == null) { + ensurePayloadIsMutable(); + payload_.add(builderForValue.build()); + onChanged(); + } else { + payloadBuilder_.addMessage(builderForValue.build()); + } + return this; + } + /** + * repeated .PersistentMessage payload = 1; + */ + public Builder addPayload( + int index, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder builderForValue) { + if (payloadBuilder_ == null) { + ensurePayloadIsMutable(); + payload_.add(index, builderForValue.build()); + onChanged(); + } else { + payloadBuilder_.addMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .PersistentMessage payload = 1; + */ + public Builder addAllPayload( + java.lang.Iterable values) { + if (payloadBuilder_ == null) { + ensurePayloadIsMutable(); + super.addAll(values, payload_); + onChanged(); + } else { + payloadBuilder_.addAllMessages(values); + } + return this; + } + /** + * repeated .PersistentMessage payload = 1; + */ + public Builder clearPayload() { + if (payloadBuilder_ == null) { + payload_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000001); + onChanged(); + } else { + payloadBuilder_.clear(); + } + return this; + } + /** + * repeated .PersistentMessage payload = 1; + */ + public Builder removePayload(int index) { + if (payloadBuilder_ == null) { + ensurePayloadIsMutable(); + payload_.remove(index); + onChanged(); + } else { + payloadBuilder_.remove(index); + } + return this; + } + /** + * repeated .PersistentMessage payload = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessage.Builder getPayloadBuilder( + int index) { + return getPayloadFieldBuilder().getBuilder(index); + } + /** + * repeated .PersistentMessage payload = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder getPayloadOrBuilder( + int index) { + if (payloadBuilder_ == null) { + return payload_.get(index); } else { + return payloadBuilder_.getMessageOrBuilder(index); + } + } + /** + * repeated .PersistentMessage payload = 1; + */ + public java.util.List + getPayloadOrBuilderList() { + if (payloadBuilder_ != null) { + return payloadBuilder_.getMessageOrBuilderList(); + } else { + return java.util.Collections.unmodifiableList(payload_); + } + } + /** + * repeated .PersistentMessage payload = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessage.Builder addPayloadBuilder() { + return getPayloadFieldBuilder().addBuilder( + akka.persistence.serialization.MessageFormats.PersistentMessage.getDefaultInstance()); + } + /** + * repeated .PersistentMessage payload = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessage.Builder addPayloadBuilder( + int index) { + return getPayloadFieldBuilder().addBuilder( + index, akka.persistence.serialization.MessageFormats.PersistentMessage.getDefaultInstance()); + } + /** + * repeated .PersistentMessage payload = 1; + */ + public java.util.List + getPayloadBuilderList() { + return getPayloadFieldBuilder().getBuilderList(); + } + private com.google.protobuf.RepeatedFieldBuilder< + akka.persistence.serialization.MessageFormats.PersistentMessage, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder, akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder> + getPayloadFieldBuilder() { + if (payloadBuilder_ == null) { + payloadBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< + akka.persistence.serialization.MessageFormats.PersistentMessage, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder, akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder>( + payload_, + ((bitField0_ & 0x00000001) == 0x00000001), + getParentForChildren(), + isClean()); + payload_ = null; + } + return payloadBuilder_; + } + + // @@protoc_insertion_point(builder_scope:AtomicWrite) + } + + static { + defaultInstance = new AtomicWrite(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:AtomicWrite) + } + public interface AtLeastOnceDeliverySnapshotOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -4032,6 +4718,11 @@ public final class MessageFormats { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_PersistentPayload_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_AtomicWrite_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_AtomicWrite_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_AtLeastOnceDeliverySnapshot_descriptor; private static @@ -4062,16 +4753,17 @@ public final class MessageFormats { " \001(\t\022\017\n\007deleted\030\004 \001(\010\022\016\n\006sender\030\013 \001(\t\022\020\n" + "\010manifest\030\014 \001(\t\"S\n\021PersistentPayload\022\024\n\014" + "serializerId\030\001 \002(\005\022\017\n\007payload\030\002 \002(\014\022\027\n\017p" + - "ayloadManifest\030\003 \001(\014\"\356\001\n\033AtLeastOnceDeli" + - "verySnapshot\022\031\n\021currentDeliveryId\030\001 \002(\003\022" + - "O\n\025unconfirmedDeliveries\030\002 \003(\01320.AtLeast" + - "OnceDeliverySnapshot.UnconfirmedDelivery", - "\032c\n\023UnconfirmedDelivery\022\022\n\ndeliveryId\030\001 " + - "\002(\003\022\023\n\013destination\030\002 \002(\t\022#\n\007payload\030\003 \002(" + - "\0132\022.PersistentPayload\"F\n\032PersistentState" + - "ChangeEvent\022\027\n\017stateIdentifier\030\001 \002(\t\022\017\n\007" + - "timeout\030\002 \001(\tB\"\n\036akka.persistence.serial" + - "izationH\001" + "ayloadManifest\030\003 \001(\014\"2\n\013AtomicWrite\022#\n\007p" + + "ayload\030\001 \003(\0132\022.PersistentMessage\"\356\001\n\033AtL" + + "eastOnceDeliverySnapshot\022\031\n\021currentDeliv" + + "eryId\030\001 \002(\003\022O\n\025unconfirmedDeliveries\030\002 \003", + "(\01320.AtLeastOnceDeliverySnapshot.Unconfi" + + "rmedDelivery\032c\n\023UnconfirmedDelivery\022\022\n\nd" + + "eliveryId\030\001 \002(\003\022\023\n\013destination\030\002 \002(\t\022#\n\007" + + "payload\030\003 \002(\0132\022.PersistentPayload\"F\n\032Per" + + "sistentStateChangeEvent\022\027\n\017stateIdentifi" + + "er\030\001 \002(\t\022\017\n\007timeout\030\002 \001(\tB\"\n\036akka.persis" + + "tence.serializationH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4090,8 +4782,14 @@ public final class MessageFormats { com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_PersistentPayload_descriptor, new java.lang.String[] { "SerializerId", "Payload", "PayloadManifest", }); - internal_static_AtLeastOnceDeliverySnapshot_descriptor = + internal_static_AtomicWrite_descriptor = getDescriptor().getMessageTypes().get(2); + internal_static_AtomicWrite_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_AtomicWrite_descriptor, + new java.lang.String[] { "Payload", }); + internal_static_AtLeastOnceDeliverySnapshot_descriptor = + getDescriptor().getMessageTypes().get(3); internal_static_AtLeastOnceDeliverySnapshot_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_AtLeastOnceDeliverySnapshot_descriptor, @@ -4103,7 +4801,7 @@ public final class MessageFormats { internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_descriptor, new java.lang.String[] { "DeliveryId", "Destination", "Payload", }); internal_static_PersistentStateChangeEvent_descriptor = - getDescriptor().getMessageTypes().get(3); + getDescriptor().getMessageTypes().get(4); internal_static_PersistentStateChangeEvent_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_PersistentStateChangeEvent_descriptor, diff --git a/akka-persistence/src/main/protobuf/MessageFormats.proto b/akka-persistence/src/main/protobuf/MessageFormats.proto index 32fec171c0..72f96ab9d2 100644 --- a/akka-persistence/src/main/protobuf/MessageFormats.proto +++ b/akka-persistence/src/main/protobuf/MessageFormats.proto @@ -25,6 +25,10 @@ message PersistentPayload { optional bytes payloadManifest = 3; } +message AtomicWrite { + repeated PersistentMessage payload = 1; +} + message AtLeastOnceDeliverySnapshot { message UnconfirmedDelivery { required int64 deliveryId = 1; diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index d54cbc919d..37aa394be4 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -132,6 +132,19 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas event.getClass.getName, seqNr, persistenceId) } + /** + * Called when the journal rejected `persist` of an event. The event was not + * stored. By default this method logs the problem as a warning, and the actor continues. + * The callback handler that was passed to the `persist` method will not be invoked. + * + * @param cause failure cause + * @param event the event that was to be persisted + */ + protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { + log.warning("Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].", + event.getClass.getName, seqNr, persistenceId, cause.getMessage) + } + /** * User-overridable callback. Called when a persistent actor is started or restarted. * Default implementation sends a `Recover()` to `self`. Note that if you override @@ -264,7 +277,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas final def persist[A](event: A)(handler: A ⇒ Unit): Unit = { pendingStashingPersistInvocations += 1 pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) - eventBatch = PersistentRepr(event) :: eventBatch + eventBatch = AtomicWrite(PersistentRepr(event, sender = sender())) :: eventBatch } /** @@ -275,8 +288,13 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * @param events events to be persisted * @param handler handler for each persisted `events` */ - final def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = - events.foreach(persist(_)(handler)) // TODO the atomic part should be handled by issue #15377 + final def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { + events.foreach { event ⇒ + pendingStashingPersistInvocations += 1 + pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + } + eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch + } @deprecated("use persistAll instead", "2.4") final def persist[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = @@ -307,7 +325,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas */ final def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = { pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) - eventBatch = PersistentRepr(event) :: eventBatch + eventBatch = AtomicWrite(PersistentRepr(event, sender = sender())) :: eventBatch } /** @@ -318,8 +336,12 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * @param events events to be persisted * @param handler handler for each persisted `events` */ - final def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = - events.foreach(persistAsync(_)(handler)) + final def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { + events.foreach { event ⇒ + pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + } + eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch + } @deprecated("use persistAllAsync instead", "2.4") final def persistAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = @@ -526,6 +548,14 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas onWriteMessageComplete(err = false) } catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e } } + case WriteMessageRejected(p, cause, id) ⇒ + // instanceId mismatch can happen for persistAsync and defer in case of actor restart + // while message is in flight, in that case the handler has already been discarded + if (id == instanceId) { + updateLastSequenceNr(p) + onWriteMessageComplete(err = false) + onPersistRejected(cause, p.payload, p.sequenceNr) + } case WriteMessageFailure(p, cause, id) ⇒ // instanceId mismatch can happen for persistAsync and defer in case of actor restart // while message is in flight, in that case the handler has already been discarded @@ -542,9 +572,11 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas onWriteMessageComplete(err = false) } catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e } } - case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒ - // FIXME PN: WriteMessagesFailed? + case WriteMessagesSuccessful ⇒ if (journalBatch.isEmpty) writeInProgress = false else flushJournalBatch() + + case WriteMessagesFailed(_) ⇒ + () // it will be stopped by the first WriteMessageFailure message } def onWriteMessageComplete(err: Boolean): Unit = @@ -594,8 +626,9 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas } private def addToBatch(p: PersistentEnvelope): Unit = p match { - case p: PersistentRepr ⇒ - journalBatch :+= p.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr(), sender = sender()) + case a: AtomicWrite ⇒ + journalBatch :+= a.copy(payload = + a.payload.map(_.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr()))) case r: PersistentEnvelope ⇒ journalBatch :+= r } diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 27e9176f01..ce694e9b8d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -70,6 +70,17 @@ private[persistence] object JournalProtocol { final case class WriteMessageSuccess(persistent: PersistentRepr, actorInstanceId: Int) extends Response + /** + * Reply message to a rejected [[WriteMessages]] request. The write of this message was rejected before + * it was stored, e.g. because it could not be serialized. For each contained [[PersistentRepr]] message + * in the request, a separate reply is sent to the requestor. + * + * @param message message rejected to be written. + * @param cause failure cause. + */ + final case class WriteMessageRejected(message: PersistentRepr, cause: Throwable, actorInstanceId: Int) + extends Response + /** * Reply message to a failed [[WriteMessages]] request. For each contained [[PersistentRepr]] message * in the request, a separate reply is sent to the requestor. diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 7da9628b12..4d5989e8b6 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -4,6 +4,7 @@ package akka.persistence +import scala.collection.immutable import java.lang.{ Iterable ⇒ JIterable } import java.util.{ List ⇒ JList } @@ -16,18 +17,30 @@ import akka.persistence.serialization.Message * * Marks messages which can be resequenced by the [[akka.persistence.journal.AsyncWriteJournal]]. * - * In essence it is either an [[NonPersistentRepr]] or [[PersistentRepr]]. + * In essence it is either an [[NonPersistentRepr]] or [[AtomicWrite]]. */ private[persistence] sealed trait PersistentEnvelope { def payload: Any def sender: ActorRef + def size: Int } /** * INTERNAL API * Message which can be resequenced by the Journal, but will not be persisted. */ -private[persistence] final case class NonPersistentRepr(payload: Any, sender: ActorRef) extends PersistentEnvelope +private[persistence] final case class NonPersistentRepr(payload: Any, sender: ActorRef) extends PersistentEnvelope { + override def size: Int = 1 +} + +object AtomicWrite { + def apply(event: PersistentRepr): AtomicWrite = apply(List(event)) +} + +final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends PersistentEnvelope with Message { + override def sender: ActorRef = ActorRef.noSender + override def size: Int = payload.size +} /** * Plugin API: representation of a persistent message in the journal plugin API. @@ -36,7 +49,7 @@ private[persistence] final case class NonPersistentRepr(payload: Any, sender: Ac * @see [[akka.persistence.journal.AsyncWriteJournal]] * @see [[akka.persistence.journal.AsyncRecovery]] */ -trait PersistentRepr extends PersistentEnvelope with Message { +trait PersistentRepr extends Message { /** * This persistent message's payload. diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 07ca18d587..547ac01ffe 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -8,10 +8,12 @@ package akka.persistence.journal import akka.actor._ import akka.pattern.pipe import akka.persistence._ - import scala.collection.immutable import scala.concurrent.Future -import scala.util._ +import scala.util.control.NonFatal +import scala.util.Try +import scala.util.Success +import scala.util.Failure /** * Abstract journal, optimized for asynchronous, non-blocking writes. @@ -27,24 +29,68 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { private val resequencer = context.actorOf(Props[Resequencer]()) private var resequencerCounter = 1L - final def receive = receiveWriteJournal orElse receivePluginInternal + final def receive = receiveWriteJournal.orElse[Any, Unit](receivePluginInternal) final val receiveWriteJournal: Actor.Receive = { case WriteMessages(messages, persistentActor, actorInstanceId) ⇒ val cctr = resequencerCounter - def resequence(f: PersistentRepr ⇒ Any) = messages.zipWithIndex.foreach { - case (p: PersistentRepr, i) ⇒ resequencer ! Desequenced(f(p), cctr + i + 1, persistentActor, p.sender) - case (r, i) ⇒ resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), cctr + i + 1, persistentActor, r.sender) + resequencerCounter += messages.foldLeft(0)((acc, m) ⇒ acc + m.size) + 1 + + val prepared = Try(preparePersistentBatch(messages)) + val writeResult = (prepared match { + case Success(prep) ⇒ + // in case the asyncWriteMessages throws + try asyncWriteMessages(prep) catch { case NonFatal(e) ⇒ Future.failed(e) } + case f @ Failure(_) ⇒ + // exception from preparePersistentBatch => rejected + Future.successful(messages.collect { case a: AtomicWrite ⇒ f }) + }).map { results ⇒ + if (results.size != prepared.get.size) + throw new IllegalStateException("asyncWriteMessages returned invalid number of results. " + + s"Expected [${prepared.get.size}], but got [${results.size}]") + results } - asyncWriteMessages(preparePersistentBatch(messages)) onComplete { - case Success(_) ⇒ + + writeResult.onComplete { + case Success(results) ⇒ resequencer ! Desequenced(WriteMessagesSuccessful, cctr, persistentActor, self) - resequence(WriteMessageSuccess(_, actorInstanceId)) + + val resultsIter = results.iterator + var n = cctr + 1 + messages.foreach { + case a: AtomicWrite ⇒ + resultsIter.next() match { + case Success(_) ⇒ + a.payload.foreach { p ⇒ + resequencer ! Desequenced(WriteMessageSuccess(p, actorInstanceId), n, persistentActor, p.sender) + n += 1 + } + case Failure(e) ⇒ + a.payload.foreach { p ⇒ + resequencer ! Desequenced(WriteMessageRejected(p, e, actorInstanceId), n, persistentActor, p.sender) + n += 1 + } + } + + case r: NonPersistentRepr ⇒ + resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender) + n += 1 + } + case Failure(e) ⇒ resequencer ! Desequenced(WriteMessagesFailed(e), cctr, persistentActor, self) - resequence(WriteMessageFailure(_, e, actorInstanceId)) + var n = cctr + 1 + messages.foreach { + case a: AtomicWrite ⇒ + a.payload.foreach { p ⇒ + resequencer ! Desequenced(WriteMessageFailure(p, e, actorInstanceId), n, persistentActor, p.sender) + n += 1 + } + case r: NonPersistentRepr ⇒ + resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender) + n += 1 + } } - resequencerCounter += messages.length + 1 case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor, replayDeleted) ⇒ // Send replayed messages and replay result to persistentActor directly. No need @@ -80,11 +126,42 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { //#journal-plugin-api /** - * Plugin API: asynchronously writes a batch of persistent messages to the journal. - * The batch write must be atomic i.e. either all persistent messages in the batch - * are written or none. + * Plugin API: asynchronously writes a batch (`Seq`) of persistent messages to the journal. + * + * The batch is only for performance reasons, i.e. all messages don't have to be written + * atomically. Higher throughput can typically be achieved by using batch inserts of many + * records compared inserting records one-by-one, but this aspect depends on the underlying + * data store and a journal implementation can implement it as efficient as possible with + * the assumption that the messages of the batch are unrelated. + * + * Each `AtomicWrite` message contains the single `PersistentRepr` that corresponds to the + * event that was passed to the `persist` method of the `PersistentActor`, or it contains + * several `PersistentRepr` that corresponds to the events that were passed to the `persistAll` + * method of the `PersistentActor`. All `PersistentRepr` of the `AtomicWrite` must be + * written to the data store atomically, i.e. all or none must be stored. + * If the journal (data store) cannot support atomic writes of multiple events it should + * reject such writes with a `Try` `Failure` with an `UnsupportedOperationException` + * describing the issue. This limitation should also be documented by the journal plugin. + * + * If there are failures when storing any of the messages in the batch the returned + * `Future` must be completed with failure. The `Future` must only be completed with + * success when all messages in the batch have been confirmed to be stored successfully, + * i.e. they will be readable, and visible, in a subsequent replay. If there are uncertainty + * about if the messages were stored or not the `Future` must be completed with failure. + * + * Data store connection problems must be signaled by completing the `Future` with + * failure. + * + * The journal can also signal that it rejects individual messages (`AtomicWrite`) by + * the returned `immutable.Seq[Try[Unit]]`. The returned `Seq` must have as many elements + * as the input `messages` `Seq`. Each `Try` element signals if the corresponding `AtomicWrite` + * is rejected or not, with an exception describing the problem. Rejecting a message means it + * was not stored, i.e. it must not be included in a later replay. Rejecting a message is + * typically done before attempting to store it, e.g. because of serialization error. + * + * Data store connection problems must not be signaled as rejections. */ - def asyncWriteMessages(messages: immutable.Seq[PersistentRepr]): Future[Unit] + def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] /** * Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr` @@ -108,6 +185,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * INTERNAL API. */ private[persistence] object AsyncWriteJournal { + val successUnit: Success[Unit] = Success(()) + final case class Desequenced(msg: Any, snr: Long, target: ActorRef, sender: ActorRef) class Resequencer extends Actor { diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala index 219cdee103..d2950ca41a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala @@ -9,6 +9,7 @@ import akka.actor._ import akka.pattern.ask import akka.persistence._ import akka.util._ +import scala.util.Try import scala.collection.immutable import scala.concurrent._ @@ -39,8 +40,8 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash implicit def timeout: Timeout - def asyncWriteMessages(messages: immutable.Seq[PersistentRepr]): Future[Unit] = - (store ? WriteMessages(messages)).mapTo[Unit] + def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = + (store ? WriteMessages(messages)).mapTo[immutable.Seq[Try[Unit]]] def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = (store ? DeleteMessagesTo(persistenceId, toSequenceNr, permanent)).mapTo[Unit] @@ -68,7 +69,7 @@ private[persistence] object AsyncWriteProxy { */ private[persistence] object AsyncWriteTarget { @SerialVersionUID(1L) - final case class WriteMessages(messages: immutable.Seq[PersistentRepr]) + final case class WriteMessages(messages: immutable.Seq[AtomicWrite]) @SerialVersionUID(1L) final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala index e97f5e00f1..4519de45fe 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -7,11 +7,14 @@ package akka.persistence.journal import scala.collection.immutable import scala.util._ - import akka.actor.{ ActorLogging, Actor } import akka.pattern.pipe import akka.persistence._ +object SyncWriteJournal { + val successUnit: Success[Unit] = Success(()) +} + /** * Abstract journal, optimized for synchronous writes. */ @@ -24,18 +27,44 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery wi final def receive = { case WriteMessages(messages, persistentActor, actorInstanceId) ⇒ - Try(writeMessages(preparePersistentBatch(messages))) match { - case Success(_) ⇒ + val writeResult = Try { + val prepared = preparePersistentBatch(messages) + val results = writeMessages(prepared) + if (results.size != prepared.size) + throw new IllegalStateException("writeMessages returned invalid number of results. " + + s"Expected [${prepared.size}], but got [${results.size}]") + results + } + writeResult match { + case Success(results) ⇒ persistentActor ! WriteMessagesSuccessful + val resultsIter = results.iterator messages.foreach { - case p: PersistentRepr ⇒ persistentActor.tell(WriteMessageSuccess(p, actorInstanceId), p.sender) - case r ⇒ persistentActor.tell(LoopMessageSuccess(r.payload, actorInstanceId), r.sender) + case a: AtomicWrite ⇒ + resultsIter.next() match { + case Success(_) ⇒ + a.payload.foreach { p ⇒ + persistentActor.tell(WriteMessageSuccess(p, actorInstanceId), p.sender) + } + case Failure(e) ⇒ + a.payload.foreach { p ⇒ + persistentActor.tell(WriteMessageRejected(p, e, actorInstanceId), p.sender) + } + } + + case r: NonPersistentRepr ⇒ + persistentActor.tell(LoopMessageSuccess(r.payload, actorInstanceId), r.sender) } + case Failure(e) ⇒ persistentActor ! WriteMessagesFailed(e) messages.foreach { - case p: PersistentRepr ⇒ persistentActor.tell(WriteMessageFailure(p, e, actorInstanceId), p.sender) - case r ⇒ persistentActor.tell(LoopMessageSuccess(r.payload, actorInstanceId), r.sender) + case a: AtomicWrite ⇒ + a.payload.foreach { p ⇒ + persistentActor.tell(WriteMessageFailure(p, e, actorInstanceId), p.sender) + } + case r: NonPersistentRepr ⇒ + persistentActor.tell(LoopMessageSuccess(r.payload, actorInstanceId), r.sender) } throw e } @@ -70,11 +99,41 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery wi //#journal-plugin-api /** - * Plugin API: synchronously writes a batch of persistent messages to the journal. - * The batch write must be atomic i.e. either all persistent messages in the batch - * are written or none. + * * Plugin API: asynchronously writes a batch (`Seq`) of persistent messages to the journal. + * + * The batch is only for performance reasons, i.e. all messages don't have to be written + * atomically. Higher throughput can typically be achieved by using batch inserts of many + * records compared inserting records one-by-one, but this aspect depends on the underlying + * data store and a journal implementation can implement it as efficient as possible with + * the assumption that the messages of the batch are unrelated. + * + * Each `AtomicWrite` message contains the single `PersistentRepr` that corresponds to the + * event that was passed to the `persist` method of the `PersistentActor`, or it contains + * several `PersistentRepr` that corresponds to the events that were passed to the `persistAll` + * method of the `PersistentActor`. All `PersistentRepr` of the `AtomicWrite` must be + * written to the data store atomically, i.e. all or none must be stored. + * If the journal (data store) cannot support atomic writes of multiple events it should + * reject such writes with a `Try` `Failure` with an `UnsupportedOperationException` + * describing the issue. This limitation should also be documented by the journal plugin. + * + * If there are failures when storing any of the messages in the batch the method must + * throw an exception. The method must only return normally when all messages in the + * batch have been confirmed to be stored successfully, i.e. they will be readable, + * and visible, in a subsequent replay. If there are uncertainty about if the + * messages were stored or not the method must throw an exception. + * + * Data store connection problems must be signaled by throwing an exception. + * + * The journal can also signal that it rejects individual messages (`AtomicWrite`) by + * the returned `immutable.Seq[Try[Unit]]`. The returned `Seq` must have as many elements + * as the input `messages` `Seq`. Each `Try` element signals if the corresponding `AtomicWrite` + * is rejected or not, with an exception describing the problem. Rejecting a message means it + * was not stored, i.e. it must not be included in a later replay. Rejecting a message is + * typically done before attempting to store it, e.g. because of serialization error. + * + * Data store connection problems must not be signaled as rejections. */ - def writeMessages(messages: immutable.Seq[PersistentRepr]): Unit + def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] /** * Plugin API: synchronously deletes all persistent messages up to `toSequenceNr` diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala b/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala index 452ff62d41..8ae48406e0 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala @@ -8,6 +8,7 @@ import akka.actor.Actor import akka.persistence.{ Persistence, PersistentEnvelope, PersistentRepr } import scala.collection.immutable +import akka.persistence.AtomicWrite private[akka] trait WriteJournalBase { this: Actor ⇒ @@ -15,9 +16,11 @@ private[akka] trait WriteJournalBase { lazy val persistence = Persistence(context.system) private def eventAdapters = persistence.adaptersFor(self) - protected def preparePersistentBatch(rb: immutable.Seq[PersistentEnvelope]): immutable.Seq[PersistentRepr] = + protected def preparePersistentBatch(rb: immutable.Seq[PersistentEnvelope]): immutable.Seq[AtomicWrite] = rb.collect { // collect instead of flatMap to avoid Some allocations - case p: PersistentRepr ⇒ adaptToJournal(p.update(sender = Actor.noSender)) // don't store the sender + case a: AtomicWrite ⇒ + // don't store sender + a.copy(payload = a.payload.map(p ⇒ adaptToJournal(p.update(sender = Actor.noSender)))) } /** INTERNAL API */ diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 79fe0b5c14..90375d8aee 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -7,11 +7,12 @@ package akka.persistence.journal.inmem import scala.collection.immutable import scala.concurrent.duration._ import scala.language.postfixOps - import akka.actor._ import akka.persistence._ +import akka.persistence.journal.AsyncWriteJournal import akka.persistence.journal.{ WriteJournalBase, AsyncWriteProxy, AsyncWriteTarget } import akka.util.Timeout +import scala.util.Try /** * INTERNAL API. @@ -36,17 +37,17 @@ private[persistence] trait InmemMessages { // persistenceId -> persistent message var messages = Map.empty[String, Vector[PersistentRepr]] - def add(p: PersistentRepr) = messages = messages + (messages.get(p.persistenceId) match { + def add(p: PersistentRepr): Unit = messages = messages + (messages.get(p.persistenceId) match { case Some(ms) ⇒ p.persistenceId -> (ms :+ p) case None ⇒ p.persistenceId -> Vector(p) }) - def update(pid: String, snr: Long)(f: PersistentRepr ⇒ PersistentRepr) = messages = messages.get(pid) match { + def update(pid: String, snr: Long)(f: PersistentRepr ⇒ PersistentRepr): Unit = messages = messages.get(pid) match { case Some(ms) ⇒ messages + (pid -> ms.map(sp ⇒ if (sp.sequenceNr == snr) f(sp) else sp)) case None ⇒ messages } - def delete(pid: String, snr: Long) = messages = messages.get(pid) match { + def delete(pid: String, snr: Long): Unit = messages = messages.get(pid) match { case Some(ms) ⇒ messages + (pid -> ms.filterNot(_.sequenceNr == snr)) case None ⇒ messages } @@ -76,7 +77,11 @@ private[persistence] class InmemStore extends Actor with InmemMessages with Writ def receive = { case WriteMessages(msgs) ⇒ - sender() ! msgs.foreach(add) + val results: immutable.Seq[Try[Unit]] = + for (a ← msgs) yield { + Try(a.payload.foreach(add)) + } + sender() ! results case DeleteMessagesTo(pid, tsnr, false) ⇒ sender() ! (1L to tsnr foreach { snr ⇒ update(pid, snr)(_.update(deleted = true)) }) case DeleteMessagesTo(pid, tsnr, true) ⇒ diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncRecovery.scala index 666add5438..6ac0428bc2 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncRecovery.scala @@ -4,10 +4,10 @@ package akka.persistence.journal.japi +import java.util.function.Consumer import scala.concurrent.Future import akka.actor.Actor -import akka.japi.Procedure import akka.persistence.journal.{ AsyncRecovery ⇒ SAsyncReplay } import akka.persistence.PersistentRepr @@ -18,8 +18,8 @@ abstract class AsyncRecovery extends SAsyncReplay with AsyncRecoveryPlugin { thi import context.dispatcher final def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit) = - doAsyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max, new Procedure[PersistentRepr] { - def apply(p: PersistentRepr) = replayCallback(p) + doAsyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max, new Consumer[PersistentRepr] { + def accept(p: PersistentRepr) = replayCallback(p) }).map(Unit.unbox) final def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala index a533b3868f..a6ac24f8c7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala @@ -6,18 +6,27 @@ package akka.persistence.journal.japi import scala.collection.immutable import scala.collection.JavaConverters._ - import akka.persistence._ import akka.persistence.journal.{ AsyncWriteJournal ⇒ SAsyncWriteJournal } +import scala.concurrent.Future +import scala.util.Try +import scala.util.Success +import scala.util.Failure /** * Java API: abstract journal, optimized for asynchronous, non-blocking writes. */ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal with AsyncWritePlugin { + import SAsyncWriteJournal.successUnit import context.dispatcher - final def asyncWriteMessages(messages: immutable.Seq[PersistentRepr]) = - doAsyncWriteMessages(messages.asJava).map(Unit.unbox) + final def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = + doAsyncWriteMessages(messages.asJava).map { results ⇒ + results.asScala.map { r ⇒ + if (r.isPresent) Failure(r.get) + else successUnit + }(collection.breakOut) + } final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = doAsyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent).map(Unit.unbox) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala index eb774e5b0d..04e89232ca 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala @@ -6,9 +6,10 @@ package akka.persistence.journal.japi import scala.collection.immutable import scala.collection.JavaConverters._ - import akka.persistence._ import akka.persistence.journal.{ SyncWriteJournal ⇒ SSyncWriteJournal } +import scala.util.Try +import scala.util.Failure import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ @@ -17,8 +18,13 @@ import scala.concurrent.duration._ * Java API: abstract journal, optimized for synchronous writes. */ abstract class SyncWriteJournal extends AsyncRecovery with SSyncWriteJournal with SyncWritePlugin { - final def writeMessages(messages: immutable.Seq[PersistentRepr]): Unit = - doWriteMessages(messages.asJava) + import SSyncWriteJournal.successUnit + + final def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] = + doWriteMessages(messages.asJava).asScala.map { o ⇒ + if (o.isPresent) Failure(o.get) + else successUnit + }(collection.breakOut) final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit = doDeleteMessagesTo(persistenceId, toSequenceNr, permanent) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index 828b8f4053..0b934e11a1 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -39,8 +39,10 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with import Key._ - def writeMessages(messages: immutable.Seq[PersistentRepr]) = - withBatch(batch ⇒ messages.foreach(message ⇒ addToMessageBatch(message, batch))) + def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] = + withBatch(batch ⇒ messages.map { a ⇒ + Try(a.payload.foreach(message ⇒ addToMessageBatch(message, batch))) + }) def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = withBatch { batch ⇒ val nid = numericId(persistenceId) diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index cb0d86427a..bbbf9d9cb2 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -5,13 +5,12 @@ package akka.persistence.serialization import akka.actor.{ ActorPath, ExtendedActorSystem } -import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot ⇒ AtLeastOnceDeliverySnap, UnconfirmedDelivery } +import akka.persistence.AtLeastOnceDelivery._ import akka.persistence._ import akka.persistence.fsm.PersistentFsmActor.StateChangeEvent -import akka.persistence.serialization.MessageFormats._ +import akka.persistence.serialization.{ MessageFormats ⇒ mf } import akka.serialization._ import com.google.protobuf._ - import scala.collection.immutable.VectorBuilder import scala.concurrent.duration import akka.actor.Actor @@ -29,9 +28,10 @@ trait Message extends Serializable class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer { import PersistentRepr.Undefined + val AtomicWriteClass = classOf[AtomicWrite] val PersistentReprClass = classOf[PersistentRepr] val PersistentImplClass = classOf[PersistentImpl] - val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap] + val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnapshot] val PersistentStateChangeEventClass = classOf[StateChangeEvent] private lazy val serialization = SerializationExtension(system) @@ -49,10 +49,11 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer * message's payload to a matching `akka.serialization.Serializer`. */ def toBinary(o: AnyRef): Array[Byte] = o match { - case p: PersistentRepr ⇒ persistentMessageBuilder(p).build().toByteArray - case a: AtLeastOnceDeliverySnap ⇒ atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray - case s: StateChangeEvent ⇒ stateChangeBuilder(s).build.toByteArray - case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") + case p: PersistentRepr ⇒ persistentMessageBuilder(p).build().toByteArray + case a: AtomicWrite ⇒ atomicWriteBuilder(a).build().toByteArray + case a: AtLeastOnceDeliverySnapshot ⇒ atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray + case s: StateChangeEvent ⇒ stateChangeBuilder(s).build.toByteArray + case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") } /** @@ -60,12 +61,13 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer * message's payload to a matching `akka.serialization.Serializer`. */ def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): Message = manifest match { - case None ⇒ persistent(PersistentMessage.parseFrom(bytes)) + case None ⇒ persistent(mf.PersistentMessage.parseFrom(bytes)) case Some(c) ⇒ c match { - case PersistentImplClass ⇒ persistent(PersistentMessage.parseFrom(bytes)) - case PersistentReprClass ⇒ persistent(PersistentMessage.parseFrom(bytes)) - case AtLeastOnceDeliverySnapshotClass ⇒ atLeastOnceDeliverySnapshot(AtLeastOnceDeliverySnapshot.parseFrom(bytes)) - case PersistentStateChangeEventClass ⇒ stateChange(PersistentStateChangeEvent.parseFrom(bytes)) + case PersistentImplClass ⇒ persistent(mf.PersistentMessage.parseFrom(bytes)) + case PersistentReprClass ⇒ persistent(mf.PersistentMessage.parseFrom(bytes)) + case AtomicWriteClass ⇒ atomicWrite(mf.AtomicWrite.parseFrom(bytes)) + case AtLeastOnceDeliverySnapshotClass ⇒ atLeastOnceDeliverySnapshot(mf.AtLeastOnceDeliverySnapshot.parseFrom(bytes)) + case PersistentStateChangeEventClass ⇒ stateChange(mf.PersistentStateChangeEvent.parseFrom(bytes)) case _ ⇒ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}") } } @@ -74,12 +76,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer // toBinary helpers // - def atLeastOnceDeliverySnapshotBuilder(snap: AtLeastOnceDeliverySnap): AtLeastOnceDeliverySnapshot.Builder = { - val builder = AtLeastOnceDeliverySnapshot.newBuilder + def atLeastOnceDeliverySnapshotBuilder(snap: AtLeastOnceDeliverySnapshot): mf.AtLeastOnceDeliverySnapshot.Builder = { + val builder = mf.AtLeastOnceDeliverySnapshot.newBuilder builder.setCurrentDeliveryId(snap.currentDeliveryId) snap.unconfirmedDeliveries.foreach { unconfirmed ⇒ val unconfirmedBuilder = - AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.newBuilder. + mf.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.newBuilder. setDeliveryId(unconfirmed.deliveryId). setDestination(unconfirmed.destination.toString). setPayload(persistentPayloadBuilder(unconfirmed.message.asInstanceOf[AnyRef])) @@ -88,15 +90,15 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer builder } - def stateChangeBuilder(stateChange: StateChangeEvent): PersistentStateChangeEvent.Builder = { - val builder = PersistentStateChangeEvent.newBuilder.setStateIdentifier(stateChange.stateIdentifier) + private[persistence] def stateChangeBuilder(stateChange: StateChangeEvent): mf.PersistentStateChangeEvent.Builder = { + val builder = mf.PersistentStateChangeEvent.newBuilder.setStateIdentifier(stateChange.stateIdentifier) stateChange.timeout match { case None ⇒ builder case Some(timeout) ⇒ builder.setTimeout(timeout.toString()) } } - def atLeastOnceDeliverySnapshot(atLeastOnceDeliverySnapshot: AtLeastOnceDeliverySnapshot): AtLeastOnceDeliverySnap = { + def atLeastOnceDeliverySnapshot(atLeastOnceDeliverySnapshot: mf.AtLeastOnceDeliverySnapshot): AtLeastOnceDeliverySnapshot = { import scala.collection.JavaConverters._ val unconfirmedDeliveries = new VectorBuilder[UnconfirmedDelivery]() atLeastOnceDeliverySnapshot.getUnconfirmedDeliveriesList().iterator().asScala foreach { next ⇒ @@ -104,19 +106,27 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer payload(next.getPayload)) } - AtLeastOnceDeliverySnap( + AtLeastOnceDeliverySnapshot( atLeastOnceDeliverySnapshot.getCurrentDeliveryId, unconfirmedDeliveries.result()) } - def stateChange(persistentStateChange: PersistentStateChangeEvent): StateChangeEvent = { + def stateChange(persistentStateChange: mf.PersistentStateChangeEvent): StateChangeEvent = { StateChangeEvent( persistentStateChange.getStateIdentifier, if (persistentStateChange.hasTimeout) Some(Duration(persistentStateChange.getTimeout).asInstanceOf[duration.FiniteDuration]) else None) } + private def atomicWriteBuilder(a: AtomicWrite) = { + val builder = mf.AtomicWrite.newBuilder + a.payload.foreach { p ⇒ + builder.addPayload(persistentMessageBuilder(p)) + } + builder + } + private def persistentMessageBuilder(persistent: PersistentRepr) = { - val builder = PersistentMessage.newBuilder + val builder = mf.PersistentMessage.newBuilder if (persistent.persistenceId != Undefined) builder.setPersistenceId(persistent.persistenceId) if (persistent.sender != Actor.noSender) builder.setSender(Serialization.serializedActorPath(persistent.sender)) @@ -130,7 +140,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer private def persistentPayloadBuilder(payload: AnyRef) = { def payloadBuilder() = { val serializer = serialization.findSerializerFor(payload) - val builder = PersistentPayload.newBuilder() + val builder = mf.PersistentPayload.newBuilder() serializer match { case ser2: SerializerWithStringManifest ⇒ @@ -158,7 +168,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer // fromBinary helpers // - private def persistent(persistentMessage: PersistentMessage): PersistentRepr = { + private def persistent(persistentMessage: mf.PersistentMessage): PersistentRepr = { PersistentRepr( payload(persistentMessage.getPayload), persistentMessage.getSequenceNr, @@ -168,7 +178,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else Actor.noSender) } - private def payload(persistentPayload: PersistentPayload): Any = { + private def atomicWrite(atomicWrite: mf.AtomicWrite): AtomicWrite = { + import scala.collection.JavaConverters._ + AtomicWrite(atomicWrite.getPayloadList.asScala.map(persistent)(collection.breakOut)) + } + + private def payload(persistentPayload: mf.PersistentPayload): Any = { val manifest = if (persistentPayload.hasPayloadManifest) persistentPayload.getPayloadManifest.toStringUtf8 else "" diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index f59f86b996..7acaf56876 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -4,6 +4,7 @@ package akka.persistence +import scala.collection.immutable import akka.actor.{ OneForOneStrategy, _ } import akka.persistence.journal.AsyncWriteProxy import akka.persistence.journal.AsyncWriteTarget.{ ReplayFailure, ReplayMessages, ReplaySuccess, WriteMessages } @@ -14,11 +15,15 @@ import akka.util.Timeout import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.control.NoStackTrace +import scala.util.Try +import akka.persistence.journal.AsyncWriteJournal +import scala.util.Failure object PersistentActorFailureSpec { import PersistentActorSpec.{ Cmd, Evt, ExamplePersistentActor } class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace + class SimulatedSerializationException(msg: String) extends RuntimeException(msg) with NoStackTrace class FailingInmemJournal extends AsyncWriteProxy { import AsyncWriteProxy.SetStore @@ -36,6 +41,8 @@ object PersistentActorFailureSpec { def failingReceive: Receive = { case w: WriteMessages if isWrong(w) ⇒ throw new SimulatedException("Simulated Store failure") + case w: WriteMessages if checkSerializable(w).exists(_.isFailure) ⇒ + sender() ! checkSerializable(w) case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ val readFromStore = read(pid, fromSnr, toSnr, max) if (readFromStore.length == 0) @@ -50,8 +57,20 @@ object PersistentActorFailureSpec { def isWrong(w: WriteMessages): Boolean = w.messages.exists { - case PersistentRepr(Evt(s: String), _) ⇒ s.contains("wrong") - case x ⇒ false + case a: AtomicWrite ⇒ + a.payload.exists { case PersistentRepr(Evt(s: String), _) ⇒ s.contains("wrong") } + case _ ⇒ false + } + + def checkSerializable(w: WriteMessages): immutable.Seq[Try[Unit]] = + w.messages.collect { + case a: AtomicWrite ⇒ + (a.payload.collectFirst { + case PersistentRepr(Evt(s: String), _) if s.contains("not serializable") ⇒ s + }) match { + case Some(s) ⇒ Failure(new SimulatedSerializationException(s)) + case None ⇒ AsyncWriteJournal.successUnit + } } def isCorrupt(events: Seq[PersistentRepr]): Boolean = @@ -169,6 +188,17 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config( expectMsg("wrong") // reply before persistAsync expectTerminated(persistentActor) } + "call onPersistRejected and continue if persist rejected" in { + system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name) + val persistentActor = expectMsgType[ActorRef] + persistentActor ! Cmd("not serializable") + expectMsg("Rejected: not serializable-1") + expectMsg("Rejected: not serializable-2") + + persistentActor ! Cmd("a") + persistentActor ! GetState + expectMsg(List("a-1", "a-2")) + } "stop if receiveRecover fails" in { prepareFailingRecovery() @@ -179,6 +209,57 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config( expectTerminated(ref) } + "support resume when persist followed by exception" in { + system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[ThrowingActor1], name) + val persistentActor = expectMsgType[ActorRef] + persistentActor ! Cmd("a") + persistentActor ! Cmd("err") + persistentActor ! Cmd("b") + expectMsgType[SimulatedException] // from supervisor + persistentActor ! Cmd("c") + persistentActor ! GetState + expectMsg(List("a", "err", "b", "c")) + } + + "support restart when persist followed by exception" in { + system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[ThrowingActor1], name) + val persistentActor = expectMsgType[ActorRef] + persistentActor ! Cmd("a") + persistentActor ! Cmd("err") + persistentActor ! Cmd("b") + expectMsgType[SimulatedException] // from supervisor + persistentActor ! Cmd("c") + persistentActor ! GetState + expectMsg(List("a", "err", "b", "c")) + } + + "support resume when persist handler throws exception" in { + system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[ThrowingActor2], name) + val persistentActor = expectMsgType[ActorRef] + persistentActor ! Cmd("a") + persistentActor ! Cmd("b") + persistentActor ! Cmd("err") + persistentActor ! Cmd("c") + expectMsgType[SimulatedException] // from supervisor + persistentActor ! Cmd("d") + persistentActor ! GetState + expectMsg(List("a", "b", "c", "d")) + } + + "support restart when persist handler throws exception" in { + system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[ThrowingActor2], name) + val persistentActor = expectMsgType[ActorRef] + persistentActor ! Cmd("a") + persistentActor ! Cmd("b") + persistentActor ! Cmd("err") + persistentActor ! Cmd("c") + expectMsgType[SimulatedException] // from supervisor + persistentActor ! Cmd("d") + persistentActor ! GetState + // err was stored, and was be replayed + expectMsg(List("a", "b", "err", "c", "d")) + } + } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 7d2a48ef89..945591afa7 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -41,6 +41,12 @@ object PersistentActorSpec { case Cmd(data) ⇒ persistAll(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState) } + + override protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = + event match { + case Evt(data) ⇒ sender() ! s"Rejected: $data" + case _ ⇒ super.onPersistRejected(cause, event, seqNr) + } } class Behavior2PersistentActor(name: String) extends ExamplePersistentActor(name) { diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala index 1c39401149..915c633775 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala @@ -7,10 +7,10 @@ package akka.persistence.journal.chaos import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.forkjoin.ThreadLocalRandom - import akka.persistence._ import akka.persistence.journal.SyncWriteJournal import akka.persistence.journal.inmem.InmemMessages +import scala.util.Try class WriteFailedException(ps: Seq[PersistentRepr]) extends TestException(s"write failed for payloads = [${ps.map(_.payload)}]") @@ -38,9 +38,13 @@ class ChaosJournal extends SyncWriteJournal { def random = ThreadLocalRandom.current - def writeMessages(messages: immutable.Seq[PersistentRepr]): Unit = - if (shouldFail(writeFailureRate)) throw new WriteFailedException(messages) - else messages.foreach(add) + def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] = + if (shouldFail(writeFailureRate)) throw new WriteFailedException(messages.flatMap(_.payload)) + else + for (a ← messages) yield { + a.payload.foreach(add) + SyncWriteJournal.successUnit + } def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit = { (1L to toSequenceNr).foreach { snr ⇒ diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index e47a1c36f6..009a16005c 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -283,6 +283,10 @@ object MessageSerializerRemotingSpec { class RemoteActor extends Actor { def receive = { case p @ PersistentRepr(MyPayload(data), _) ⇒ p.sender ! s"p${data}" + case a: AtomicWrite ⇒ + a.payload.foreach { + case p @ PersistentRepr(MyPayload(data), _) ⇒ p.sender ! s"p${data}" + } } } @@ -308,13 +312,21 @@ class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customS } "A message serializer" must { - "custom-serialize Persistent messages during remoting" in { + "custom-serialize PersistentRepr messages during remoting" in { // this also verifies serialization of PersistentRepr.sender, // because the RemoteActor will reply to the PersistentRepr.sender // is kept intact localActor ! PersistentRepr(MyPayload("a"), sender = testActor) expectMsg("p.a.") } + + "custom-serialize AtomicWrite messages during remoting" in { + val p1 = PersistentRepr(MyPayload("a"), sender = testActor) + val p2 = PersistentRepr(MyPayload("b"), sender = testActor) + localActor ! AtomicWrite(List(p1, p2)) + expectMsg("p.a.") + expectMsg("p.b.") + } } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java index de5f85de5f..3d8506de38 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java @@ -16,7 +16,7 @@ import akka.persistence.journal.leveldb.SharedLeveldbJournal; import akka.persistence.journal.leveldb.SharedLeveldbStore; import akka.japi.pf.ReceiveBuilder; import scala.concurrent.Future; -import akka.japi.Procedure; +import java.util.function.Consumer; import java.util.Optional; @@ -78,7 +78,7 @@ public class LambdaPersistencePluginDocTest { class MyAsyncJournal extends AsyncWriteJournal { @Override - public Future doAsyncWriteMessages(Iterable messages) { + public Future>> doAsyncWriteMessages(Iterable messages) { return null; } @@ -91,7 +91,7 @@ public class LambdaPersistencePluginDocTest { public Future doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max, - Procedure replayCallback) { + Consumer replayCallback) { return null; }