inmem journal must not reset highestSequenceNr after journal cleanup, #27994 (#27995)

* add tck test (failed for exactly this)
* keep track of highest
This commit is contained in:
Patrik Nordwall 2019-10-15 10:37:48 +02:00 committed by Johan Andrén
parent 4bd3edaa32
commit 50a2355ec6
3 changed files with 24 additions and 10 deletions

View file

@ -0,0 +1,13 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.journal.inmem
import akka.persistence.CapabilityFlag
import akka.persistence.PersistenceSpec
import akka.persistence.journal.JournalSpec
class InmemJournalSpec extends JournalSpec(config = PersistenceSpec.config("inmem", "InmemJournalSpec")) {
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.off()
}

View file

@ -0,0 +1,5 @@
# #27994 changes to internal inmem journal
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.inmem.InmemMessages.update")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.journal.inmem.InmemMessages.akka$persistence$journal$inmem$InmemMessages$$highestSequenceNumbers")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.journal.inmem.InmemMessages.akka$persistence$journal$inmem$InmemMessages$$highestSequenceNumbers_=")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.inmem.InmemJournal.update")

View file

@ -87,16 +87,16 @@ private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal w
private[persistence] trait InmemMessages { private[persistence] trait InmemMessages {
// persistenceId -> persistent message // persistenceId -> persistent message
var messages = Map.empty[String, Vector[PersistentRepr]] var messages = Map.empty[String, Vector[PersistentRepr]]
// persistenceId -> highest used sequence number
private var highestSequenceNumbers = Map.empty[String, Long]
def add(p: PersistentRepr): Unit = def add(p: PersistentRepr): Unit = {
messages = messages + (messages.get(p.persistenceId) match { messages = messages + (messages.get(p.persistenceId) match {
case Some(ms) => p.persistenceId -> (ms :+ p) case Some(ms) => p.persistenceId -> (ms :+ p)
case None => p.persistenceId -> Vector(p) case None => p.persistenceId -> Vector(p)
}) })
highestSequenceNumbers =
def update(pid: String, snr: Long)(f: PersistentRepr => PersistentRepr): Unit = messages = messages.get(pid) match { highestSequenceNumbers.updated(p.persistenceId, math.max(highestSequenceNr(p.persistenceId), p.sequenceNr))
case Some(ms) => messages + (pid -> ms.map(sp => if (sp.sequenceNr == snr) f(sp) else sp))
case None => messages
} }
def delete(pid: String, snr: Long): Unit = messages = messages.get(pid) match { def delete(pid: String, snr: Long): Unit = messages = messages.get(pid) match {
@ -111,11 +111,7 @@ private[persistence] trait InmemMessages {
} }
def highestSequenceNr(pid: String): Long = { def highestSequenceNr(pid: String): Long = {
val snro = for { highestSequenceNumbers.getOrElse(pid, 0L)
ms <- messages.get(pid)
m <- ms.lastOption
} yield m.sequenceNr
snro.getOrElse(0L)
} }
private def safeLongToInt(l: Long): Int = private def safeLongToInt(l: Long): Int =