From 550aa10db193c9479b233cf611eeb241a03d9663 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 9 Oct 2015 16:00:06 +0200 Subject: [PATCH] =per Clarify concurrency of asyncWriteMessages --- .../persistence/journal/japi/AsyncWritePlugin.java | 14 ++++++++++++++ .../persistence/journal/AsyncWriteJournal.scala | 10 ++++++++++ 2 files changed, 24 insertions(+) diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java index 06619f0c5e..8e1c4822b9 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java @@ -59,6 +59,18 @@ interface AsyncWritePlugin { * * Note that it is possible to reduce number of allocations by caching some * result `Iterable` for the happy path, i.e. when no messages are rejected. + * + * Calls to this method are serialized by the enclosing journal actor. If you spawn + * work in asyncronous tasks it is alright that they complete the futures in any order, + * but the actual writes for a specific persistenceId should be serialized to avoid + * issues such as events of a later write are visible to consumers (query side, or replay) + * before the events of an earlier write are visible. This can also be done with + * consistent hashing if it is too fine grained to do it on the persistenceId level. + * Normally a `PersistentActor` will only have one outstanding write request to the journal but + * it may emit several write requests when `persistAsync` is used and the max batch size + * is reached. + * + * This call is protected with a circuit-breaker. */ Future>> doAsyncWriteMessages(Iterable messages); @@ -66,6 +78,8 @@ interface AsyncWritePlugin { * Java API, Plugin API: synchronously deletes all persistent messages up to * `toSequenceNr`. * + * This call is protected with a circuit-breaker. + * * @see AsyncRecoveryPlugin */ Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr); diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index fc1ae4e0b8..f2eaddee2c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -206,6 +206,16 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * It is possible but not mandatory to reduce number of allocations by returning * `Future.successful(Nil)` for the happy path, i.e. when no messages are rejected. * + * Calls to this method are serialized by the enclosing journal actor. If you spawn + * work in asyncronous tasks it is alright that they complete the futures in any order, + * but the actual writes for a specific persistenceId should be serialized to avoid + * issues such as events of a later write are visible to consumers (query side, or replay) + * before the events of an earlier write are visible. This can also be done with + * consistent hashing if it is too fine grained to do it on the persistenceId level. + * Normally a `PersistentActor` will only have one outstanding write request to the journal but + * it may emit several write requests when `persistAsync` is used and the max batch size + * is reached. + * * This call is protected with a circuit-breaker. */ def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]