diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/inmem/InmemJournalSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/inmem/InmemJournalSpec.scala new file mode 100644 index 0000000000..ada2ab9146 --- /dev/null +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/inmem/InmemJournalSpec.scala @@ -0,0 +1,13 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.journal.inmem + +import akka.persistence.CapabilityFlag +import akka.persistence.PersistenceSpec +import akka.persistence.journal.JournalSpec + +class InmemJournalSpec extends JournalSpec(config = PersistenceSpec.config("inmem", "InmemJournalSpec")) { + override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.off() +} diff --git a/akka-persistence/src/main/mima-filters/2.5.x.backwards.excludes/issue-27994-inmem-journal.excludes b/akka-persistence/src/main/mima-filters/2.5.x.backwards.excludes/issue-27994-inmem-journal.excludes new file mode 100644 index 0000000000..13a6c43777 --- /dev/null +++ b/akka-persistence/src/main/mima-filters/2.5.x.backwards.excludes/issue-27994-inmem-journal.excludes @@ -0,0 +1,5 @@ +# #27994 changes to internal inmem journal +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.inmem.InmemMessages.update") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.journal.inmem.InmemMessages.akka$persistence$journal$inmem$InmemMessages$$highestSequenceNumbers") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.journal.inmem.InmemMessages.akka$persistence$journal$inmem$InmemMessages$$highestSequenceNumbers_=") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.inmem.InmemJournal.update") diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 83730430f3..ec13207415 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -87,16 +87,16 @@ private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal w private[persistence] trait InmemMessages { // persistenceId -> persistent message var messages = Map.empty[String, Vector[PersistentRepr]] + // persistenceId -> highest used sequence number + private var highestSequenceNumbers = Map.empty[String, Long] - def add(p: PersistentRepr): Unit = + def add(p: PersistentRepr): Unit = { messages = messages + (messages.get(p.persistenceId) match { case Some(ms) => p.persistenceId -> (ms :+ p) case None => p.persistenceId -> Vector(p) }) - - def update(pid: String, snr: Long)(f: PersistentRepr => PersistentRepr): Unit = messages = messages.get(pid) match { - case Some(ms) => messages + (pid -> ms.map(sp => if (sp.sequenceNr == snr) f(sp) else sp)) - case None => messages + highestSequenceNumbers = + highestSequenceNumbers.updated(p.persistenceId, math.max(highestSequenceNr(p.persistenceId), p.sequenceNr)) } def delete(pid: String, snr: Long): Unit = messages = messages.get(pid) match { @@ -111,11 +111,7 @@ private[persistence] trait InmemMessages { } def highestSequenceNr(pid: String): Long = { - val snro = for { - ms <- messages.get(pid) - m <- ms.lastOption - } yield m.sequenceNr - snro.getOrElse(0L) + highestSequenceNumbers.getOrElse(pid, 0L) } private def safeLongToInt(l: Long): Int =