diff --git a/akka-bench-jmh/build.sbt b/akka-bench-jmh/build.sbt index d8b557d9e4..008d14d269 100644 --- a/akka-bench-jmh/build.sbt +++ b/akka-bench-jmh/build.sbt @@ -7,3 +7,5 @@ disablePlugins(Unidoc) AkkaBuild.defaultSettings AkkaBuild.dontPublishSettings + +Dependencies.benchJmh diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 3ebda5bf2a..f7f276b6e7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -35,6 +35,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { val cctr = resequencerCounter resequencerCounter += messages.foldLeft(0)((acc, m) ⇒ acc + m.size) + 1 + val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite]) val prepared = Try(preparePersistentBatch(messages)) val writeResult = (prepared match { case Success(prep) ⇒ @@ -44,7 +45,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { // exception from preparePersistentBatch => rejected Future.successful(messages.collect { case a: AtomicWrite ⇒ f }) }).map { results ⇒ - if (results.size != prepared.get.size) + if (results.nonEmpty && results.size != atomicWriteCount) throw new IllegalStateException("asyncWriteMessages returned invalid number of results. " + s"Expected [${prepared.get.size}], but got [${results.size}]") results @@ -54,7 +55,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { case Success(results) ⇒ resequencer ! Desequenced(WriteMessagesSuccessful, cctr, persistentActor, self) - val resultsIter = results.iterator + val resultsIter = + if (results.isEmpty) Iterator.fill(atomicWriteCount)(AsyncWriteJournal.successUnit) + else results.iterator var n = cctr + 1 messages.foreach { case a: AtomicWrite ⇒ @@ -157,7 +160,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * failure. * * The journal can also signal that it rejects individual messages (`AtomicWrite`) by - * the returned `immutable.Seq[Try[Unit]]`. The returned `Seq` must have as many elements + * the returned `immutable.Seq[Try[Unit]]`. It is possible but not mandatory to reduce + * number of allocations by returning `Future.successful(Nil)` for the happy path, + * i.e. when no messages are rejected. Otherwise the returned `Seq` must have as many elements * as the input `messages` `Seq`. Each `Try` element signals if the corresponding * `AtomicWrite` is rejected or not, with an exception describing the problem. Rejecting * a message means it was not stored, i.e. it must not be included in a later replay. @@ -166,8 +171,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * * Data store connection problems must not be signaled as rejections. * - * Note that it is possible to reduce number of allocations by - * caching some result `Seq` for the happy path, i.e. when no messages are rejected. + * It is possible but not mandatory to reduce number of allocations by returning + * `Future.successful(Nil)` for the happy path, i.e. when no messages are rejected. */ def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala index 360c6cfb70..4b9105457f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala @@ -25,6 +25,7 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le case WriteMessages(messages) ⇒ // TODO it would be nice to DRY this with AsyncWriteJournal, but this is using // AsyncWriteProxy message protocol + val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite]) val prepared = Try(preparePersistentBatch(messages)) val writeResult = (prepared match { case Success(prep) ⇒ @@ -34,7 +35,7 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le // exception from preparePersistentBatch => rejected Future.successful(messages.collect { case a: AtomicWrite ⇒ f }) }).map { results ⇒ - if (results.size != prepared.get.size) + if (results.nonEmpty && results.size != atomicWriteCount) throw new IllegalStateException("asyncWriteMessages returned invalid number of results. " + s"Expected [${prepared.get.size}], but got [${results.size}]") results diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 78c5277801..02b8b26d31 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -119,4 +119,6 @@ object Dependencies { val docs = l ++= Seq(Test.scalatest.value, Test.junit, Test.junitIntf, Docs.sprayJson, Docs.gson) val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo) + + val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative) }