From 4638f5630e5eb3befee358fb797bd4c75a699cfc Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 25 Jun 2015 19:58:47 +0200 Subject: [PATCH] !per #17832 Remove SyncWriteJournal --- .../persistence/PersistencePluginDocTest.java | 14 +- akka-docs/rst/java/lambda-persistence.rst | 15 +- akka-docs/rst/java/persistence.rst | 17 ++- .../project/migration-guide-2.3.x-2.4.x.rst | 26 +++- .../PersistencePluginDocSpec.scala | 10 +- akka-docs/rst/scala/persistence.rst | 15 +- .../journal/japi/AsyncRecoveryPlugin.java | 4 +- .../journal/japi/AsyncWritePlugin.java | 5 +- .../journal/japi/SyncWritePlugin.java | 67 -------- .../akka/persistence/JournalProtocol.scala | 10 +- .../scala/akka/persistence/Persistent.scala | 5 +- .../persistence/journal/AsyncRecovery.scala | 4 +- .../journal/AsyncWriteJournal.scala | 44 +++--- .../journal/SyncWriteJournal.scala | 144 ------------------ .../journal/japi/SyncWriteJournal.scala | 31 ---- .../journal/leveldb/LeveldbJournal.scala | 2 +- .../journal/leveldb/LeveldbStore.scala | 59 +++---- .../journal/leveldb/SharedLeveldbStore.scala | 55 +++++++ .../journal/chaos/ChaosJournal.scala | 33 ++-- .../doc/LambdaPersistencePluginDocTest.java | 15 +- 20 files changed, 218 insertions(+), 357 deletions(-) delete mode 100644 akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java delete mode 100644 akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala delete mode 100644 akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala create mode 100644 akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index 8540e33c15..f739aecfdb 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -84,10 +84,20 @@ public class PersistencePluginDocTest { } class MyAsyncJournal extends AsyncWriteJournal { + //#sync-journal-plugin-api @Override - public Future>> doAsyncWriteMessages(Iterable messages) { - return null; + public Future>> doAsyncWriteMessages( + Iterable messages) { + try { + Iterable> result = new ArrayList>(); + // blocking call here... + // result.add(..) + return Futures.successful(result); + } catch (Exception e) { + return Futures.failed(e); + } } + //#sync-journal-plugin-api @Override public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) { diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index af84bc61a8..96eb781013 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -595,18 +595,17 @@ Plugin development requires the following imports: Journal plugin API ------------------ -A journal plugin either extends ``SyncWriteJournal`` or ``AsyncWriteJournal``. ``SyncWriteJournal`` is an -actor that should be extended when the storage backend API only supports synchronous, blocking writes. In this -case, the methods to be implemented are: +A journal plugin extends ``AsyncWriteJournal``. -.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java#sync-write-plugin-api - -``AsyncWriteJournal`` is an actor that should be extended if the storage backend API supports asynchronous, -non-blocking writes. In this case, the methods to be implemented are: +``AsyncWriteJournal`` is an actor and the methods to be implemented are: .. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java#async-write-plugin-api -Message replays and sequence number recovery are always asynchronous, therefore, any journal plugin must implement: +If the storage backend API only supports synchronous, blocking writes, the methods should be implemented as: + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java#sync-journal-plugin-api + +A journal plugin must also implement the methods defined in ``AsyncRecovery`` for replays and sequence number recovery: .. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java#async-replay-plugin-api diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index d7710d8604..a727c2b43b 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -597,21 +597,22 @@ Plugin development requires the following imports: .. includecode:: code/docs/persistence/PersistencePluginDocTest.java#plugin-imports +.. _journal-plugin-api-java: + Journal plugin API ------------------ -A journal plugin either extends ``SyncWriteJournal`` or ``AsyncWriteJournal``. ``SyncWriteJournal`` is an -actor that should be extended when the storage backend API only supports synchronous, blocking writes. In this -case, the methods to be implemented are: +A journal plugin extends ``AsyncWriteJournal``. -.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java#sync-write-plugin-api - -``AsyncWriteJournal`` is an actor that should be extended if the storage backend API supports asynchronous, -non-blocking writes. In this case, the methods to be implemented are: +``AsyncWriteJournal`` is an actor and the methods to be implemented are: .. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java#async-write-plugin-api -Message replays and sequence number recovery are always asynchronous, therefore, any journal plugin must implement: +If the storage backend API only supports synchronous, blocking writes, the methods should be implemented as: + +.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#sync-journal-plugin-api + +A journal plugin must also implement the methods defined in ``AsyncRecovery`` for replays and sequence number recovery: .. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java#async-replay-plugin-api diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index e0577a65ae..cb8bfe0170 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -321,11 +321,23 @@ non-permanent deletion ---------------------- The ``permanent`` flag in ``deleteMessages`` was removed. non-permanent deletes are not supported -any more. +any more. Events that were deleted with ``permanent=false`` with older version will +still not be replayed in this version. Persistence Plugin APIs ======================= +SyncWriteJournal removed +------------------------ + +``SyncWriteJournal`` removed in favor of using ``AsyncWriteJournal``. + +If the storage backend API only supports synchronous, blocking writes, +the methods can still be implemented in terms of the asynchronous API. +Example of how to do that is in included in the +See :ref:`Journal plugin API for Scala ` +or :ref:`Journal plugin API for Java `. + SnapshotStore: Snapshots can now be deleted asynchronously (and report failures) -------------------------------------------------------------------------------- Previously the ``SnapshotStore`` plugin SPI did not allow for asynchronous deletion of snapshots, @@ -394,7 +406,7 @@ slightly different than its Scala counterpart (where ``Option.apply(null)`` retu Atomic writes ------------- -``asyncWriteMessages`` and ``writeMessages`` takes a ``immutable.Seq[AtomicWrite]`` parameter instead of +``asyncWriteMessages`` takes a ``immutable.Seq[AtomicWrite]`` parameter instead of ``immutable.Seq[PersistentRepr]``. Each `AtomicWrite` message contains the single ``PersistentRepr`` that corresponds to the event that was @@ -410,8 +422,7 @@ describing the issue. This limitation should also be documented by the journal p Rejecting writes ---------------- -``asyncWriteMessages`` and ``writeMessages`` returns a ``Future[immutable.Seq[Try[Unit]]]`` or `` -``immutable.Seq[Try[Unit]]`` respectively. +``asyncWriteMessages`` returns a ``Future[immutable.Seq[Try[Unit]]]``. The journal can signal that it rejects individual messages (``AtomicWrite``) by the returned `immutable.Seq[Try[Unit]]`. The returned ``Seq`` must have as many elements as the input @@ -420,8 +431,8 @@ is rejected or not, with an exception describing the problem. Rejecting a messag was not stored, i.e. it must not be included in a later replay. Rejecting a message is typically done before attempting to store it, e.g. because of serialization error. -Read the API documentation of these methods for more information about the semantics of -rejections and failures. +Read the :ref:`API documentation ` of this method for more +information about the semantics of rejections and failures. asyncReplayMessages Java API ---------------------------- @@ -433,5 +444,6 @@ asyncDeleteMessagesTo --------------------- The ``permanent`` deletion flag was removed. Support for non-permanent deletions was -removed. +removed. Events that were deleted with ``permanent=false`` with older version will +still not be replayed in this version. diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index b0b12ab6b8..507ba9c329 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -14,6 +14,7 @@ import scala.collection.immutable.Seq import scala.concurrent.Future import scala.util.Try import scala.concurrent.duration._ +import scala.util.control.NonFatal //#plugin-imports import akka.persistence._ @@ -127,7 +128,14 @@ trait SharedLeveldbPluginDocSpec { } class MyJournal extends AsyncWriteJournal { - def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = ??? + //#sync-journal-plugin-api + def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = + Future.fromTry(Try { + // blocking call here + ??? + }) + //#sync-journal-plugin-api + def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = ??? def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index d19bab7a91..ab91c55a1a 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -652,18 +652,17 @@ Plugin development requires the following imports: Journal plugin API ------------------ -A journal plugin either extends ``SyncWriteJournal`` or ``AsyncWriteJournal``. ``SyncWriteJournal`` is an -actor that should be extended when the storage backend API only supports synchronous, blocking writes. In this -case, the methods to be implemented are: +A journal plugin extends ``AsyncWriteJournal``. -.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala#journal-plugin-api - -``AsyncWriteJournal`` is an actor that should be extended if the storage backend API supports asynchronous, -non-blocking writes. In this case, the methods to be implemented are: +``AsyncWriteJournal`` is an actor and the methods to be implemented are: .. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala#journal-plugin-api -Message replays and sequence number recovery are always asynchronous, therefore, any journal plugin must implement: +If the storage backend API only supports synchronous, blocking writes, the methods should be implemented as: + +.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#sync-journal-plugin-api + +A journal plugin must also implement the methods defined in ``AsyncRecovery`` for replays and sequence number recovery: .. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala#journal-plugin-api diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java index 0292b46700..c1b95cafbc 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java @@ -34,8 +34,8 @@ interface AsyncRecoveryPlugin { * @param replayCallback * called to replay a single message. Can be called from any thread. */ - Future doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max, - Consumer replayCallback); + Future doAsyncReplayMessages(String persistenceId, long fromSequenceNr, + long toSequenceNr, long max, Consumer replayCallback); /** * Java API, Plugin API: asynchronously reads the highest stored sequence diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java index 0da067fd87..ed95939ec6 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java @@ -38,7 +38,7 @@ interface AsyncWritePlugin { * returned `Future` must be completed with failure. The `Future` must only be * completed with success when all messages in the batch have been confirmed * to be stored successfully, i.e. they will be readable, and visible, in a - * subsequent replay. If there are uncertainty about if the messages were + * subsequent replay. If there is uncertainty about if the messages were * stored or not the `Future` must be completed with failure. * * Data store connection problems must be signaled by completing the `Future` @@ -55,6 +55,9 @@ interface AsyncWritePlugin { * serialization error. * * Data store connection problems must not be signaled as rejections. + * + * Note that it is possible to reduce number of allocations by caching some + * result `Iterable` for the happy path, i.e. when no messages are rejected. */ Future>> doAsyncWriteMessages(Iterable messages); diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java deleted file mode 100644 index 964b94ade7..0000000000 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Copyright (C) 2009-2015 Typesafe Inc. - */ - -package akka.persistence.journal.japi; - -import java.util.Optional; - -import akka.persistence.*; -import scala.concurrent.Future; - -interface SyncWritePlugin { - //#sync-write-plugin-api - /** - * Java API, Plugin API: asynchronously writes a batch (`Iterable`) of - * persistent messages to the journal. - * - * The batch is only for performance reasons, i.e. all messages don't have to - * be written atomically. Higher throughput can typically be achieved by using - * batch inserts of many records compared inserting records one-by-one, but - * this aspect depends on the underlying data store and a journal - * implementation can implement it as efficient as possible with the - * assumption that the messages of the batch are unrelated. - * - * Each `AtomicWrite` message contains the single `PersistentRepr` that - * corresponds to the event that was passed to the `persist` method of the - * `PersistentActor`, or it contains several `PersistentRepr` that corresponds - * to the events that were passed to the `persistAll` method of the - * `PersistentActor`. All `PersistentRepr` of the `AtomicWrite` must be - * written to the data store atomically, i.e. all or none must be stored. If - * the journal (data store) cannot support atomic writes of multiple events it - * should reject such writes with an `Optional` with an - * `UnsupportedOperationException` describing the issue. This limitation - * should also be documented by the journal plugin. - * - * If there are failures when storing any of the messages in the batch the - * method must throw an exception. The method must only return normally when - * all messages in the batch have been confirmed to be stored successfully, - * i.e. they will be readable, and visible, in a subsequent replay. If there - * are uncertainty about if the messages were stored or not the method must - * throw an exception. - * - * Data store connection problems must be signaled by throwing an exception. - * - * The journal can also signal that it rejects individual messages - * (`AtomicWrite`) by the returned - * `Iterable<Optional<Exception>>`. The returned `Iterable` must - * have as many elements as the input `messages` `Iterable`. Each `Optional` - * element signals if the corresponding `AtomicWrite` is rejected or not, with - * an exception describing the problem. Rejecting a message means it was not - * stored, i.e. it must not be included in a later replay. Rejecting a message - * is typically done before attempting to store it, e.g. because of - * serialization error. - * - * Data store connection problems must not be signaled as rejections. - */ - Iterable> doWriteMessages(Iterable messages); - - /** - * Java API, Plugin API: synchronously deletes all persistent messages up to - * `toSequenceNr`. - * - * @see AsyncRecoveryPlugin - */ - void doDeleteMessagesTo(String persistenceId, long toSequenceNr); - //#sync-write-plugin-api -} diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index f8004ba80f..29e5cf4f3d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -42,7 +42,7 @@ private[persistence] object JournalProtocol { * @param persistentActor write requestor. */ final case class WriteMessages(messages: immutable.Seq[PersistentEnvelope], persistentActor: ActorRef, actorInstanceId: Int) - extends Request + extends Request with NoSerializationVerificationNeeded /** * Reply message to a successful [[WriteMessages]] request. This reply is sent to the requestor @@ -78,7 +78,7 @@ private[persistence] object JournalProtocol { * @param cause failure cause. */ final case class WriteMessageRejected(message: PersistentRepr, cause: Throwable, actorInstanceId: Int) - extends Response + extends Response with NoSerializationVerificationNeeded /** * Reply message to a failed [[WriteMessages]] request. For each contained [[PersistentRepr]] message @@ -88,7 +88,7 @@ private[persistence] object JournalProtocol { * @param cause failure cause. */ final case class WriteMessageFailure(message: PersistentRepr, cause: Throwable, actorInstanceId: Int) - extends Response + extends Response with NoSerializationVerificationNeeded /** * Reply message to a [[WriteMessages]] with a non-persistent message. @@ -96,7 +96,7 @@ private[persistence] object JournalProtocol { * @param message looped message. */ final case class LoopMessageSuccess(message: Any, actorInstanceId: Int) - extends Response + extends Response with NoSerializationVerificationNeeded /** * Request to replay messages to `persistentActor`. @@ -117,7 +117,7 @@ private[persistence] object JournalProtocol { * @param persistent replayed message. */ final case class ReplayedMessage(persistent: PersistentRepr) - extends Response with DeadLetterSuppression + extends Response with DeadLetterSuppression with NoSerializationVerificationNeeded /** * Reply message to a successful [[ReplayMessages]] request. This reply is sent to the requestor diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 900c4d296d..1d184cafea 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -7,10 +7,10 @@ package akka.persistence import scala.collection.immutable import java.lang.{ Iterable ⇒ JIterable } import java.util.{ List ⇒ JList } - import akka.actor.{ ActorContext, ActorRef } import akka.pattern.PromiseActorRef import akka.persistence.serialization.Message +import akka.actor.NoSerializationVerificationNeeded /** * INTERNAL API @@ -45,7 +45,6 @@ final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends Per /** * Plugin API: representation of a persistent message in the journal plugin API. * - * @see [[akka.persistence.journal.SyncWriteJournal]] * @see [[akka.persistence.journal.AsyncWriteJournal]] * @see [[akka.persistence.journal.AsyncRecovery]] */ @@ -152,7 +151,7 @@ private[persistence] final case class PersistentImpl( override val manifest: String, override val deleted: Boolean, override val sender: ActorRef, - override val writerUuid: String) extends PersistentRepr { + override val writerUuid: String) extends PersistentRepr with NoSerializationVerificationNeeded { def withPayload(payload: Any): PersistentRepr = copy(payload = payload) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala index c128c19bac..a009f100df 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala @@ -32,9 +32,9 @@ trait AsyncRecovery { * thread. * * @see [[AsyncWriteJournal]] - * @see [[SyncWriteJournal]] */ - def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] + def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, + max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] /** * Plugin API: asynchronously reads the highest stored sequence number for the diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index a45fc5156b..e2826ea6c7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -126,40 +126,47 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { //#journal-plugin-api /** - * Plugin API: asynchronously writes a batch (`Seq`) of persistent messages to the journal. + * Plugin API: asynchronously writes a batch (`Seq`) of persistent messages to the + * journal. * * The batch is only for performance reasons, i.e. all messages don't have to be written * atomically. Higher throughput can typically be achieved by using batch inserts of many - * records compared inserting records one-by-one, but this aspect depends on the underlying - * data store and a journal implementation can implement it as efficient as possible with - * the assumption that the messages of the batch are unrelated. + * records compared inserting records one-by-one, but this aspect depends on the + * underlying data store and a journal implementation can implement it as efficient as + * possible with the assumption that the messages of the batch are unrelated. * - * Each `AtomicWrite` message contains the single `PersistentRepr` that corresponds to the - * event that was passed to the `persist` method of the `PersistentActor`, or it contains - * several `PersistentRepr` that corresponds to the events that were passed to the `persistAll` - * method of the `PersistentActor`. All `PersistentRepr` of the `AtomicWrite` must be - * written to the data store atomically, i.e. all or none must be stored. - * If the journal (data store) cannot support atomic writes of multiple events it should - * reject such writes with a `Try` `Failure` with an `UnsupportedOperationException` - * describing the issue. This limitation should also be documented by the journal plugin. + * Each `AtomicWrite` message contains the single `PersistentRepr` that corresponds to + * the event that was passed to the `persist` method of the `PersistentActor`, or it + * contains several `PersistentRepr` that corresponds to the events that were passed + * to the `persistAll` method of the `PersistentActor`. All `PersistentRepr` of the + * `AtomicWrite` must be written to the data store atomically, i.e. all or none must + * be stored. If the journal (data store) cannot support atomic writes of multiple + * events it should reject such writes with a `Try` `Failure` with an + * `UnsupportedOperationException` describing the issue. This limitation should + * also be documented by the journal plugin. * * If there are failures when storing any of the messages in the batch the returned * `Future` must be completed with failure. The `Future` must only be completed with * success when all messages in the batch have been confirmed to be stored successfully, - * i.e. they will be readable, and visible, in a subsequent replay. If there are uncertainty - * about if the messages were stored or not the `Future` must be completed with failure. + * i.e. they will be readable, and visible, in a subsequent replay. If there is + * uncertainty about if the messages were stored or not the `Future` must be completed + * with failure. * * Data store connection problems must be signaled by completing the `Future` with * failure. * * The journal can also signal that it rejects individual messages (`AtomicWrite`) by * the returned `immutable.Seq[Try[Unit]]`. The returned `Seq` must have as many elements - * as the input `messages` `Seq`. Each `Try` element signals if the corresponding `AtomicWrite` - * is rejected or not, with an exception describing the problem. Rejecting a message means it - * was not stored, i.e. it must not be included in a later replay. Rejecting a message is - * typically done before attempting to store it, e.g. because of serialization error. + * as the input `messages` `Seq`. Each `Try` element signals if the corresponding + * `AtomicWrite` is rejected or not, with an exception describing the problem. Rejecting + * a message means it was not stored, i.e. it must not be included in a later replay. + * Rejecting a message is typically done before attempting to store it, e.g. because of + * serialization error. * * Data store connection problems must not be signaled as rejections. + * + * Note that it is possible to reduce number of allocations by + * caching some result `Seq` for the happy path, i.e. when no messages are rejected. */ def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] @@ -187,6 +194,7 @@ private[persistence] object AsyncWriteJournal { val successUnit: Success[Unit] = Success(()) final case class Desequenced(msg: Any, snr: Long, target: ActorRef, sender: ActorRef) + extends NoSerializationVerificationNeeded class Resequencer extends Actor { import scala.collection.mutable.Map diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala deleted file mode 100644 index 0cb27a91fe..0000000000 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Copyright (C) 2009-2015 Typesafe Inc. - * Copyright (C) 2012-2013 Eligotech BV. - */ - -package akka.persistence.journal - -import scala.collection.immutable -import scala.util._ -import akka.actor.{ ActorLogging, Actor } -import akka.pattern.pipe -import akka.persistence._ - -object SyncWriteJournal { - val successUnit: Success[Unit] = Success(()) -} - -/** - * Abstract journal, optimized for synchronous writes. - */ -trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery with ActorLogging { - import JournalProtocol._ - import context.dispatcher - - private val extension = Persistence(context.system) - private val publish = extension.settings.internal.publishPluginCommands - - final def receive = { - case WriteMessages(messages, persistentActor, actorInstanceId) ⇒ - val writeResult = Try { - val prepared = preparePersistentBatch(messages) - val results = writeMessages(prepared) - if (results.size != prepared.size) - throw new IllegalStateException("writeMessages returned invalid number of results. " + - s"Expected [${prepared.size}], but got [${results.size}]") - results - } - writeResult match { - case Success(results) ⇒ - persistentActor ! WriteMessagesSuccessful - val resultsIter = results.iterator - messages.foreach { - case a: AtomicWrite ⇒ - resultsIter.next() match { - case Success(_) ⇒ - a.payload.foreach { p ⇒ - persistentActor.tell(WriteMessageSuccess(p, actorInstanceId), p.sender) - } - case Failure(e) ⇒ - a.payload.foreach { p ⇒ - persistentActor.tell(WriteMessageRejected(p, e, actorInstanceId), p.sender) - } - } - - case r: NonPersistentRepr ⇒ - persistentActor.tell(LoopMessageSuccess(r.payload, actorInstanceId), r.sender) - } - - case Failure(e) ⇒ - persistentActor ! WriteMessagesFailed(e) - messages.foreach { - case a: AtomicWrite ⇒ - a.payload.foreach { p ⇒ - persistentActor.tell(WriteMessageFailure(p, e, actorInstanceId), p.sender) - } - case r: NonPersistentRepr ⇒ - persistentActor.tell(LoopMessageSuccess(r.payload, actorInstanceId), r.sender) - } - throw e - } - - case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) ⇒ - asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p ⇒ - if (!p.deleted) // old records from 2.3 may still have the deleted flag - adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒ - persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), adaptedPersistentRepr.sender) - } - } map { - case _ ⇒ ReplayMessagesSuccess - } recover { - case e ⇒ ReplayMessagesFailure(e) - } pipeTo persistentActor onSuccess { - case _ if publish ⇒ context.system.eventStream.publish(r) - } - - case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor) ⇒ - asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).map { - highest ⇒ ReadHighestSequenceNrSuccess(highest) - } recover { - case e ⇒ ReadHighestSequenceNrFailure(e) - } pipeTo persistentActor - - case d @ DeleteMessagesTo(persistenceId, toSequenceNr) ⇒ - Try(deleteMessagesTo(persistenceId, toSequenceNr)) match { - case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) - case Failure(e) ⇒ - } - } - - //#journal-plugin-api - /** - * * Plugin API: asynchronously writes a batch (`Seq`) of persistent messages to the journal. - * - * The batch is only for performance reasons, i.e. all messages don't have to be written - * atomically. Higher throughput can typically be achieved by using batch inserts of many - * records compared inserting records one-by-one, but this aspect depends on the underlying - * data store and a journal implementation can implement it as efficient as possible with - * the assumption that the messages of the batch are unrelated. - * - * Each `AtomicWrite` message contains the single `PersistentRepr` that corresponds to the - * event that was passed to the `persist` method of the `PersistentActor`, or it contains - * several `PersistentRepr` that corresponds to the events that were passed to the `persistAll` - * method of the `PersistentActor`. All `PersistentRepr` of the `AtomicWrite` must be - * written to the data store atomically, i.e. all or none must be stored. - * If the journal (data store) cannot support atomic writes of multiple events it should - * reject such writes with a `Try` `Failure` with an `UnsupportedOperationException` - * describing the issue. This limitation should also be documented by the journal plugin. - * - * If there are failures when storing any of the messages in the batch the method must - * throw an exception. The method must only return normally when all messages in the - * batch have been confirmed to be stored successfully, i.e. they will be readable, - * and visible, in a subsequent replay. If there are uncertainty about if the - * messages were stored or not the method must throw an exception. - * - * Data store connection problems must be signaled by throwing an exception. - * - * The journal can also signal that it rejects individual messages (`AtomicWrite`) by - * the returned `immutable.Seq[Try[Unit]]`. The returned `Seq` must have as many elements - * as the input `messages` `Seq`. Each `Try` element signals if the corresponding `AtomicWrite` - * is rejected or not, with an exception describing the problem. Rejecting a message means it - * was not stored, i.e. it must not be included in a later replay. Rejecting a message is - * typically done before attempting to store it, e.g. because of serialization error. - * - * Data store connection problems must not be signaled as rejections. - */ - def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] - - /** - * Plugin API: synchronously deletes all persistent messages up to `toSequenceNr` - * (inclusive). - */ - def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit - //#journal-plugin-api -} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala deleted file mode 100644 index eb1fb26016..0000000000 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright (C) 2009-2015 Typesafe Inc. - */ - -package akka.persistence.journal.japi - -import scala.collection.immutable -import scala.collection.JavaConverters._ -import akka.persistence._ -import akka.persistence.journal.{ SyncWriteJournal ⇒ SSyncWriteJournal } -import scala.util.Try -import scala.util.Failure - -import scala.concurrent.{ Await, Future } -import scala.concurrent.duration._ - -/** - * Java API: abstract journal, optimized for synchronous writes. - */ -abstract class SyncWriteJournal extends AsyncRecovery with SSyncWriteJournal with SyncWritePlugin { - import SSyncWriteJournal.successUnit - - final def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] = - doWriteMessages(messages.asJava).asScala.map { o ⇒ - if (o.isPresent) Failure(o.get) - else successUnit - }(collection.breakOut) - - final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit = - doDeleteMessagesTo(persistenceId, toSequenceNr) -} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index 7f84439cb0..95b98fb008 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -17,7 +17,7 @@ import akka.util.Helpers.ConfigOps * * Journal backed by a local LevelDB store. For production use. */ -private[persistence] class LeveldbJournal extends { val configPath = "akka.persistence.journal.leveldb" } with SyncWriteJournal with LeveldbStore +private[persistence] class LeveldbJournal extends { val configPath = "akka.persistence.journal.leveldb" } with AsyncWriteJournal with LeveldbStore /** * INTERNAL API. diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index d09b21c789..c582ef6467 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -6,15 +6,16 @@ package akka.persistence.journal.leveldb import java.io.File - import akka.actor._ import akka.persistence._ import akka.persistence.journal.{ WriteJournalBase, AsyncWriteTarget } import akka.serialization.SerializationExtension import org.iq80.leveldb._ - import scala.collection.immutable import scala.util._ +import scala.concurrent.Future +import scala.util.control.NonFatal +import akka.persistence.journal.AsyncWriteJournal /** * INTERNAL API. @@ -39,25 +40,32 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with import Key._ - def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] = - withBatch(batch ⇒ messages.map { a ⇒ - Try(a.payload.foreach(message ⇒ addToMessageBatch(message, batch))) + def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = + Future.fromTry(Try { + withBatch(batch ⇒ messages.map { a ⇒ + Try(a.payload.foreach(message ⇒ addToMessageBatch(message, batch))) + }) }) - def deleteMessagesTo(persistenceId: String, toSequenceNr: Long) = withBatch { batch ⇒ - val nid = numericId(persistenceId) + def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = + try Future.successful { + withBatch { batch ⇒ + val nid = numericId(persistenceId) - // seek to first existing message - val fromSequenceNr = withIterator { iter ⇒ - val startKey = Key(nid, 1L, 0) - iter.seek(keyToBytes(startKey)) - if (iter.hasNext) keyFromBytes(iter.peekNext().getKey).sequenceNr else Long.MaxValue - } + // seek to first existing message + val fromSequenceNr = withIterator { iter ⇒ + val startKey = Key(nid, 1L, 0) + iter.seek(keyToBytes(startKey)) + if (iter.hasNext) keyFromBytes(iter.peekNext().getKey).sequenceNr else Long.MaxValue + } - fromSequenceNr to toSequenceNr foreach { sequenceNr ⇒ - batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) + fromSequenceNr to toSequenceNr foreach { sequenceNr ⇒ + batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) + } + } + } catch { + case NonFatal(e) ⇒ Future.failed(e) } - } def leveldbSnapshot(): ReadOptions = leveldbReadOptions.snapshot(leveldb.getSnapshot) @@ -103,22 +111,3 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with } } -/** - * A LevelDB store that can be shared by multiple actor systems. The shared store must be - * set for each actor system that uses the store via `SharedLeveldbJournal.setStore`. The - * shared LevelDB store is for testing only. - */ -class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.leveldb-shared.store" } with LeveldbStore { - import AsyncWriteTarget._ - - def receive = { - case WriteMessages(msgs) ⇒ sender() ! writeMessages(preparePersistentBatch(msgs)) - case DeleteMessagesTo(pid, tsnr) ⇒ sender() ! deleteMessagesTo(pid, tsnr) - case ReadHighestSequenceNr(pid, fromSequenceNr) ⇒ sender() ! readHighestSequenceNr(numericId(pid)) - case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ - Try(replayMessages(numericId(pid), fromSnr, toSnr, max)(p ⇒ adaptFromJournal(p).foreach { sender() ! _ })) match { - case Success(max) ⇒ sender() ! ReplaySuccess - case Failure(cause) ⇒ sender() ! ReplayFailure(cause) - } - } -} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala new file mode 100644 index 0000000000..34170e8651 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.persistence.journal.leveldb + +import akka.persistence.journal.AsyncWriteTarget +import akka.pattern.pipe +import scala.util.Try +import scala.util.Success +import scala.util.Failure +import scala.util.control.NonFatal +import akka.persistence.AtomicWrite +import scala.concurrent.Future + +/** + * A LevelDB store that can be shared by multiple actor systems. The shared store must be + * set for each actor system that uses the store via `SharedLeveldbJournal.setStore`. The + * shared LevelDB store is for testing only. + */ +class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.leveldb-shared.store" } with LeveldbStore { + import AsyncWriteTarget._ + import context.dispatcher + + def receive = { + case WriteMessages(messages) ⇒ + val prepared = Try(preparePersistentBatch(messages)) + val writeResult = (prepared match { + case Success(prep) ⇒ + // in case the asyncWriteMessages throws + try asyncWriteMessages(prep) catch { case NonFatal(e) ⇒ Future.failed(e) } + case f @ Failure(_) ⇒ + // exception from preparePersistentBatch => rejected + Future.successful(messages.collect { case a: AtomicWrite ⇒ f }) + }).map { results ⇒ + if (results.size != prepared.get.size) + throw new IllegalStateException("asyncWriteMessages returned invalid number of results. " + + s"Expected [${prepared.get.size}], but got [${results.size}]") + results + } + + writeResult.pipeTo(sender()) + + case DeleteMessagesTo(pid, tsnr) ⇒ + asyncDeleteMessagesTo(pid, tsnr).pipeTo(sender()) + + case ReadHighestSequenceNr(pid, fromSequenceNr) ⇒ + asyncReadHighestSequenceNr(pid, fromSequenceNr).pipeTo(sender()) + + case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ + Try(replayMessages(numericId(pid), fromSnr, toSnr, max)(p ⇒ adaptFromJournal(p).foreach { sender() ! _ })) match { + case Success(max) ⇒ sender() ! ReplaySuccess + case Failure(cause) ⇒ sender() ! ReplayFailure(cause) + } + } +} diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala index 66dad14957..848cbc2073 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala @@ -8,9 +8,10 @@ import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.forkjoin.ThreadLocalRandom import akka.persistence._ -import akka.persistence.journal.SyncWriteJournal +import akka.persistence.journal.AsyncWriteJournal import akka.persistence.journal.inmem.InmemMessages import scala.util.Try +import scala.util.control.NonFatal class WriteFailedException(ps: Seq[PersistentRepr]) extends TestException(s"write failed for payloads = [${ps.map(_.payload)}]") @@ -27,7 +28,7 @@ class ReadHighestFailedException */ private object ChaosJournalMessages extends InmemMessages -class ChaosJournal extends SyncWriteJournal { +class ChaosJournal extends AsyncWriteJournal { import ChaosJournalMessages.{ delete ⇒ del, _ } val config = context.system.settings.config.getConfig("akka.persistence.journal.chaos") @@ -38,17 +39,25 @@ class ChaosJournal extends SyncWriteJournal { def random = ThreadLocalRandom.current - def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] = - if (shouldFail(writeFailureRate)) throw new WriteFailedException(messages.flatMap(_.payload)) - else - for (a ← messages) yield { - a.payload.foreach(add) - SyncWriteJournal.successUnit - } + override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = + try Future.successful { + if (shouldFail(writeFailureRate)) throw new WriteFailedException(messages.flatMap(_.payload)) + else + for (a ← messages) yield { + a.payload.foreach(add) + AsyncWriteJournal.successUnit + } + } catch { + case NonFatal(e) ⇒ Future.failed(e) + } - def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit = { - (1L to toSequenceNr).foreach { snr ⇒ - del(persistenceId, snr) + override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = { + try Future.successful { + (1L to toSequenceNr).foreach { snr ⇒ + del(persistenceId, snr) + } + } catch { + case NonFatal(e) ⇒ Future.failed(e) } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java index dd70b39d8e..5adf109f99 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java @@ -15,6 +15,7 @@ import akka.actor.*; import akka.persistence.journal.leveldb.SharedLeveldbJournal; import akka.persistence.journal.leveldb.SharedLeveldbStore; import akka.japi.pf.ReceiveBuilder; +import java.util.ArrayList; import scala.concurrent.Future; import java.util.function.Consumer; import java.util.Optional; @@ -77,10 +78,20 @@ public class LambdaPersistencePluginDocTest { } class MyAsyncJournal extends AsyncWriteJournal { + //#sync-journal-plugin-api @Override - public Future>> doAsyncWriteMessages(Iterable messages) { - return null; + public Future>> doAsyncWriteMessages( + Iterable messages) { + try { + Iterable> result = new ArrayList>(); + // blocking call here... + // result.add(..) + return Futures.successful(result); + } catch (Exception e) { + return Futures.failed(e); + } } + //#sync-journal-plugin-api @Override public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {