diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 95a4a3cd0d..1e165c4ddb 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -303,11 +303,13 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * @param handler handler for each persisted `events` */ def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { - events.foreach { event ⇒ - pendingStashingPersistInvocations += 1 - pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + if (events.nonEmpty) { + events.foreach { event ⇒ + pendingStashingPersistInvocations += 1 + pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + } + eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch } - eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch } @deprecated("use persistAll instead", "2.4") @@ -350,12 +352,13 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * @param events events to be persisted * @param handler handler for each persisted `events` */ - def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { - events.foreach { event ⇒ - pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = + if (events.nonEmpty) { + events.foreach { event ⇒ + pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + } + eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch } - eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch - } @deprecated("use persistAllAsync instead", "2.4") def persistAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 6e8b03c402..d7e1d39cdb 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -4,13 +4,13 @@ package akka.persistence -import scala.collection.immutable import java.lang.{ Iterable ⇒ JIterable } import java.util.{ List ⇒ JList } -import akka.actor.{ ActorContext, ActorRef } -import akka.pattern.PromiseActorRef + +import akka.actor.{ ActorRef, NoSerializationVerificationNeeded } import akka.persistence.serialization.Message -import akka.actor.NoSerializationVerificationNeeded + +import scala.collection.immutable /** * INTERNAL API @@ -38,10 +38,11 @@ object AtomicWrite { } final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends PersistentEnvelope with Message { + require(payload.nonEmpty, "payload of AtomicWrite must not be empty!") // only check that all persistenceIds are equal when there's more than one in the Seq if (payload match { - case l: List[PersistentRepr] ⇒ l.tail.nonEmpty + case l: List[PersistentRepr] ⇒ l.tail.nonEmpty // avoids calling .size case v: Vector[PersistentRepr] ⇒ v.size > 1 case _ ⇒ true // some other collection type, let's just check }) require(payload.forall(_.persistenceId == payload.head.persistenceId), diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index ff36fc9723..47c74667aa 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -208,11 +208,11 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * * Calls to this method are serialized by the enclosing journal actor. If you spawn * work in asyncronous tasks it is alright that they complete the futures in any order, - * but the actual writes for a specific persistenceId should be serialized to avoid + * but the actual writes for a specific persistenceId should be serialized to avoid * issues such as events of a later write are visible to consumers (query side, or replay) * before the events of an earlier write are visible. This can also be done with * consistent hashing if it is too fine grained to do it on the persistenceId level. - * Normally a `PersistentActor` will only have one outstanding write request to the journal but + * Normally a `PersistentActor` will only have one outstanding write request to the journal but * it may emit several write requests when `persistAsync` is used and the max batch size * is reached. * diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 981498bb3e..384c1966e2 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -248,6 +248,22 @@ object PersistentActorSpec { persistAsync(event) { evt ⇒ sender() ! s"${evt.data}-b-${sendMsgCounter.incrementAndGet()}" } } } + class PersistAllNilPersistentActor(name: String) extends ExamplePersistentActor(name) { + + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data: String) if data contains "defer" ⇒ + deferAsync("before-nil")(sender() ! _) + persistAll(Nil)(_ ⇒ sender() ! "Nil") + deferAsync("after-nil")(sender() ! _) + sender() ! data + + case Cmd(data: String) if data contains "persist" ⇒ + persist("before-nil")(sender() ! _) + persistAll(Nil)(_ ⇒ sender() ! "Nil") + deferAsync("after-nil")(sender() ! _) + sender() ! data + } + } class AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActor(name: String) extends ExamplePersistentActor(name) { var counter = 0 @@ -785,6 +801,17 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg("x-b-2") expectNoMsg(100.millis) } + "support calling persistAll with Nil" in { + val persistentActor = namedPersistentActor[PersistAllNilPersistentActor] + persistentActor ! Cmd("defer-x") + expectMsg("before-nil") + expectMsg("after-nil") + expectMsg("defer-x") + persistentActor ! Cmd("persist-x") + expectMsg("persist-x") + expectMsg("before-nil") + expectMsg("after-nil") + } "support a mix of persist calls (sync, async, sync) and persist calls in expected order" in { val persistentActor = namedPersistentActor[AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActor] persistentActor ! Cmd("a")