Merge pull request #18772 from ktoso/wip-persist-nil-ktoso
=per #18728 allow persistAll(Nil)
This commit is contained in:
commit
81cba2e580
3 changed files with 45 additions and 14 deletions
|
|
@ -303,11 +303,13 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
|
||||||
* @param handler handler for each persisted `events`
|
* @param handler handler for each persisted `events`
|
||||||
*/
|
*/
|
||||||
def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
|
def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
|
||||||
events.foreach { event ⇒
|
if (events.nonEmpty) {
|
||||||
pendingStashingPersistInvocations += 1
|
events.foreach { event ⇒
|
||||||
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
pendingStashingPersistInvocations += 1
|
||||||
|
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
||||||
|
}
|
||||||
|
eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch
|
||||||
}
|
}
|
||||||
eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@deprecated("use persistAll instead", "2.4")
|
@deprecated("use persistAll instead", "2.4")
|
||||||
|
|
@ -350,12 +352,13 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
|
||||||
* @param events events to be persisted
|
* @param events events to be persisted
|
||||||
* @param handler handler for each persisted `events`
|
* @param handler handler for each persisted `events`
|
||||||
*/
|
*/
|
||||||
def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = {
|
def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit =
|
||||||
events.foreach { event ⇒
|
if (events.nonEmpty) {
|
||||||
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
events.foreach { event ⇒
|
||||||
|
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
||||||
|
}
|
||||||
|
eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch
|
||||||
}
|
}
|
||||||
eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch
|
|
||||||
}
|
|
||||||
|
|
||||||
@deprecated("use persistAllAsync instead", "2.4")
|
@deprecated("use persistAllAsync instead", "2.4")
|
||||||
def persistAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit =
|
def persistAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit =
|
||||||
|
|
|
||||||
|
|
@ -4,13 +4,13 @@
|
||||||
|
|
||||||
package akka.persistence
|
package akka.persistence
|
||||||
|
|
||||||
import scala.collection.immutable
|
|
||||||
import java.lang.{ Iterable ⇒ JIterable }
|
import java.lang.{ Iterable ⇒ JIterable }
|
||||||
import java.util.{ List ⇒ JList }
|
import java.util.{ List ⇒ JList }
|
||||||
import akka.actor.{ ActorContext, ActorRef }
|
|
||||||
import akka.pattern.PromiseActorRef
|
import akka.actor.{ ActorRef, NoSerializationVerificationNeeded }
|
||||||
import akka.persistence.serialization.Message
|
import akka.persistence.serialization.Message
|
||||||
import akka.actor.NoSerializationVerificationNeeded
|
|
||||||
|
import scala.collection.immutable
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -38,10 +38,11 @@ object AtomicWrite {
|
||||||
}
|
}
|
||||||
|
|
||||||
final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends PersistentEnvelope with Message {
|
final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends PersistentEnvelope with Message {
|
||||||
|
require(payload.nonEmpty, "payload of AtomicWrite must not be empty!")
|
||||||
|
|
||||||
// only check that all persistenceIds are equal when there's more than one in the Seq
|
// only check that all persistenceIds are equal when there's more than one in the Seq
|
||||||
if (payload match {
|
if (payload match {
|
||||||
case l: List[PersistentRepr] ⇒ l.tail.nonEmpty
|
case l: List[PersistentRepr] ⇒ l.tail.nonEmpty // avoids calling .size
|
||||||
case v: Vector[PersistentRepr] ⇒ v.size > 1
|
case v: Vector[PersistentRepr] ⇒ v.size > 1
|
||||||
case _ ⇒ true // some other collection type, let's just check
|
case _ ⇒ true // some other collection type, let's just check
|
||||||
}) require(payload.forall(_.persistenceId == payload.head.persistenceId),
|
}) require(payload.forall(_.persistenceId == payload.head.persistenceId),
|
||||||
|
|
|
||||||
|
|
@ -248,6 +248,22 @@ object PersistentActorSpec {
|
||||||
persistAsync(event) { evt ⇒ sender() ! s"${evt.data}-b-${sendMsgCounter.incrementAndGet()}" }
|
persistAsync(event) { evt ⇒ sender() ! s"${evt.data}-b-${sendMsgCounter.incrementAndGet()}" }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
class PersistAllNilPersistentActor(name: String) extends ExamplePersistentActor(name) {
|
||||||
|
|
||||||
|
val receiveCommand: Receive = commonBehavior orElse {
|
||||||
|
case Cmd(data: String) if data contains "defer" ⇒
|
||||||
|
deferAsync("before-nil")(sender() ! _)
|
||||||
|
persistAll(Nil)(_ ⇒ sender() ! "Nil")
|
||||||
|
deferAsync("after-nil")(sender() ! _)
|
||||||
|
sender() ! data
|
||||||
|
|
||||||
|
case Cmd(data: String) if data contains "persist" ⇒
|
||||||
|
persist("before-nil")(sender() ! _)
|
||||||
|
persistAll(Nil)(_ ⇒ sender() ! "Nil")
|
||||||
|
deferAsync("after-nil")(sender() ! _)
|
||||||
|
sender() ! data
|
||||||
|
}
|
||||||
|
}
|
||||||
class AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActor(name: String) extends ExamplePersistentActor(name) {
|
class AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActor(name: String) extends ExamplePersistentActor(name) {
|
||||||
|
|
||||||
var counter = 0
|
var counter = 0
|
||||||
|
|
@ -785,6 +801,17 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
|
||||||
expectMsg("x-b-2")
|
expectMsg("x-b-2")
|
||||||
expectNoMsg(100.millis)
|
expectNoMsg(100.millis)
|
||||||
}
|
}
|
||||||
|
"support calling persistAll with Nil" in {
|
||||||
|
val persistentActor = namedPersistentActor[PersistAllNilPersistentActor]
|
||||||
|
persistentActor ! Cmd("defer-x")
|
||||||
|
expectMsg("before-nil")
|
||||||
|
expectMsg("after-nil")
|
||||||
|
expectMsg("defer-x")
|
||||||
|
persistentActor ! Cmd("persist-x")
|
||||||
|
expectMsg("persist-x")
|
||||||
|
expectMsg("before-nil")
|
||||||
|
expectMsg("after-nil")
|
||||||
|
}
|
||||||
"support a mix of persist calls (sync, async, sync) and persist calls in expected order" in {
|
"support a mix of persist calls (sync, async, sync) and persist calls in expected order" in {
|
||||||
val persistentActor = namedPersistentActor[AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActor]
|
val persistentActor = namedPersistentActor[AsyncPersistAndPersistMixedSyncAsyncSyncPersistentActor]
|
||||||
persistentActor ! Cmd("a")
|
persistentActor ! Cmd("a")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue