Merge pull request #18185 from akka/wip-17894-asyncWriteMessages-patriknw

+per #17894 Support Future(Nil) in asyncWriteMessages
This commit is contained in:
Patrik Nordwall 2015-08-12 16:19:09 +02:00
commit 9993e032cb
4 changed files with 16 additions and 6 deletions

View file

@ -7,3 +7,5 @@ disablePlugins(Unidoc)
AkkaBuild.defaultSettings
AkkaBuild.dontPublishSettings
Dependencies.benchJmh

View file

@ -35,6 +35,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
val cctr = resequencerCounter
resequencerCounter += messages.foldLeft(0)((acc, m) acc + m.size) + 1
val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite])
val prepared = Try(preparePersistentBatch(messages))
val writeResult = (prepared match {
case Success(prep)
@ -44,7 +45,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
// exception from preparePersistentBatch => rejected
Future.successful(messages.collect { case a: AtomicWrite f })
}).map { results
if (results.size != prepared.get.size)
if (results.nonEmpty && results.size != atomicWriteCount)
throw new IllegalStateException("asyncWriteMessages returned invalid number of results. " +
s"Expected [${prepared.get.size}], but got [${results.size}]")
results
@ -54,7 +55,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
case Success(results)
resequencer ! Desequenced(WriteMessagesSuccessful, cctr, persistentActor, self)
val resultsIter = results.iterator
val resultsIter =
if (results.isEmpty) Iterator.fill(atomicWriteCount)(AsyncWriteJournal.successUnit)
else results.iterator
var n = cctr + 1
messages.foreach {
case a: AtomicWrite
@ -157,7 +160,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
* failure.
*
* The journal can also signal that it rejects individual messages (`AtomicWrite`) by
* the returned `immutable.Seq[Try[Unit]]`. The returned `Seq` must have as many elements
* the returned `immutable.Seq[Try[Unit]]`. It is possible but not mandatory to reduce
* number of allocations by returning `Future.successful(Nil)` for the happy path,
* i.e. when no messages are rejected. Otherwise the returned `Seq` must have as many elements
* as the input `messages` `Seq`. Each `Try` element signals if the corresponding
* `AtomicWrite` is rejected or not, with an exception describing the problem. Rejecting
* a message means it was not stored, i.e. it must not be included in a later replay.
@ -166,8 +171,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
*
* Data store connection problems must not be signaled as rejections.
*
* Note that it is possible to reduce number of allocations by
* caching some result `Seq` for the happy path, i.e. when no messages are rejected.
* It is possible but not mandatory to reduce number of allocations by returning
* `Future.successful(Nil)` for the happy path, i.e. when no messages are rejected.
*/
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]

View file

@ -25,6 +25,7 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le
case WriteMessages(messages)
// TODO it would be nice to DRY this with AsyncWriteJournal, but this is using
// AsyncWriteProxy message protocol
val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite])
val prepared = Try(preparePersistentBatch(messages))
val writeResult = (prepared match {
case Success(prep)
@ -34,7 +35,7 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le
// exception from preparePersistentBatch => rejected
Future.successful(messages.collect { case a: AtomicWrite f })
}).map { results
if (results.size != prepared.get.size)
if (results.nonEmpty && results.size != atomicWriteCount)
throw new IllegalStateException("asyncWriteMessages returned invalid number of results. " +
s"Expected [${prepared.get.size}], but got [${results.size}]")
results

View file

@ -119,4 +119,6 @@ object Dependencies {
val docs = l ++= Seq(Test.scalatest.value, Test.junit, Test.junitIntf, Docs.sprayJson, Docs.gson)
val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo)
val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative)
}