diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 1d184cafea..d0906c395f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -38,6 +38,20 @@ object AtomicWrite { } final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends PersistentEnvelope with Message { + + // only check that all persistenceIds are equal when there's more than one in the Seq + if (payload match { + case l: List[PersistentRepr] => l.tail.nonEmpty + case v: Vector[PersistentRepr] => v.size > 1 + case _ => true // some other collection type, let's just check + }) require(payload.forall(_.persistenceId == payload.head.persistenceId), + "AtomicWrite must contain messages for the same persistenceId, " + + s"yet different persistenceIds found: ${payload.map(_.persistenceId).toSet}") + + def persistenceId = payload.head.persistenceId + def lowestSequenceNr = payload.head.sequenceNr // this assumes they're gapless; they should be (it is only our code creating AWs) + def highestSequenceNr = payload.last.sequenceNr // TODO: could be optimised, since above require traverses already + override def sender: ActorRef = ActorRef.noSender override def size: Int = payload.size } diff --git a/akka-persistence/src/test/scala/akka/persistence/AtomicWriteSpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtomicWriteSpec.scala new file mode 100644 index 0000000000..d13b49adf6 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/AtomicWriteSpec.scala @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.persistence + +import org.scalatest.{ Matchers, WordSpec } + +class AtomicWriteSpec extends WordSpec with Matchers { + + "AtomicWrite" must { + "only contain messages for the same persistence id" in { + AtomicWrite( + PersistentRepr("", 1, "p1") :: + PersistentRepr("", 2, "p1") :: Nil).persistenceId should ===("p1") + + intercept[IllegalArgumentException] { + AtomicWrite( + PersistentRepr("", 1, "p1") :: + PersistentRepr("", 2, "p1") :: + PersistentRepr("", 3, "p2") :: Nil) + } + } + + "have highestSequenceNr" in { + AtomicWrite( + PersistentRepr("", 1, "p1") :: + PersistentRepr("", 2, "p1") :: + PersistentRepr("", 3, "p1") :: Nil).highestSequenceNr should ===(3) + } + + "have lowestSequenceNr" in { + AtomicWrite( + PersistentRepr("", 2, "p1") :: + PersistentRepr("", 3, "p1") :: + PersistentRepr("", 4, "p1") :: Nil).lowestSequenceNr should ===(2) + } + } + +}