+per #17894 Support Future(Nil) in asyncWriteMessages

Future.successful(Nil) is an alternative way to signal all good
in the happy case, for reduced allocations.

fix test failure

benchJmh leveldb dependency

revert leveldb change
This commit is contained in:
Patrik Nordwall 2015-08-11 12:22:38 +02:00
parent b0a6ffc217
commit 63ba2ae8e7
4 changed files with 16 additions and 6 deletions

View file

@ -7,3 +7,5 @@ disablePlugins(Unidoc)
AkkaBuild.defaultSettings
AkkaBuild.dontPublishSettings
Dependencies.benchJmh

View file

@ -35,6 +35,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
val cctr = resequencerCounter
resequencerCounter += messages.foldLeft(0)((acc, m) acc + m.size) + 1
val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite])
val prepared = Try(preparePersistentBatch(messages))
val writeResult = (prepared match {
case Success(prep)
@ -44,7 +45,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
// exception from preparePersistentBatch => rejected
Future.successful(messages.collect { case a: AtomicWrite f })
}).map { results
if (results.size != prepared.get.size)
if (results.nonEmpty && results.size != atomicWriteCount)
throw new IllegalStateException("asyncWriteMessages returned invalid number of results. " +
s"Expected [${prepared.get.size}], but got [${results.size}]")
results
@ -54,7 +55,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
case Success(results)
resequencer ! Desequenced(WriteMessagesSuccessful, cctr, persistentActor, self)
val resultsIter = results.iterator
val resultsIter =
if (results.isEmpty) Iterator.fill(atomicWriteCount)(AsyncWriteJournal.successUnit)
else results.iterator
var n = cctr + 1
messages.foreach {
case a: AtomicWrite
@ -157,7 +160,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
* failure.
*
* The journal can also signal that it rejects individual messages (`AtomicWrite`) by
* the returned `immutable.Seq[Try[Unit]]`. The returned `Seq` must have as many elements
* the returned `immutable.Seq[Try[Unit]]`. It is possible but not mandatory to reduce
* number of allocations by returning `Future.successful(Nil)` for the happy path,
* i.e. when no messages are rejected. Otherwise the returned `Seq` must have as many elements
* as the input `messages` `Seq`. Each `Try` element signals if the corresponding
* `AtomicWrite` is rejected or not, with an exception describing the problem. Rejecting
* a message means it was not stored, i.e. it must not be included in a later replay.
@ -166,8 +171,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
*
* Data store connection problems must not be signaled as rejections.
*
* Note that it is possible to reduce number of allocations by
* caching some result `Seq` for the happy path, i.e. when no messages are rejected.
* It is possible but not mandatory to reduce number of allocations by returning
* `Future.successful(Nil)` for the happy path, i.e. when no messages are rejected.
*/
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]

View file

@ -25,6 +25,7 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le
case WriteMessages(messages)
// TODO it would be nice to DRY this with AsyncWriteJournal, but this is using
// AsyncWriteProxy message protocol
val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite])
val prepared = Try(preparePersistentBatch(messages))
val writeResult = (prepared match {
case Success(prep)
@ -34,7 +35,7 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le
// exception from preparePersistentBatch => rejected
Future.successful(messages.collect { case a: AtomicWrite f })
}).map { results
if (results.size != prepared.get.size)
if (results.nonEmpty && results.size != atomicWriteCount)
throw new IllegalStateException("asyncWriteMessages returned invalid number of results. " +
s"Expected [${prepared.get.size}], but got [${results.size}]")
results

View file

@ -119,4 +119,6 @@ object Dependencies {
val docs = l ++= Seq(Test.scalatest.value, Test.junit, Test.junitIntf, Docs.sprayJson, Docs.gson)
val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo)
val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative)
}