Reading until last global offset at time of calling currentChanges, a… (#30372)

* Reading until last global offset at time of calling currentChanges, added test.
This commit is contained in:
Raymond Roestenburg 2021-07-08 15:20:36 +02:00 committed by GitHub
parent c4903ebe1e
commit 353d2ae6d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 87 additions and 1 deletions

View file

@ -86,8 +86,10 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem)
}
def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = this.synchronized {
val currentGlobalOffset = lastGlobalOffset.get()
changes(tag, offset).takeWhile(_.offset match {
case Sequence(fromOffset) => fromOffset <= lastGlobalOffset.get()
case Sequence(fromOffset) =>
fromOffset <= currentGlobalOffset
case offset =>
throw new UnsupportedOperationException(s"$offset not supported in PersistenceTestKitDurableStateStore.")
})

View file

@ -0,0 +1,84 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.query
import akka.actor.ExtendedActorSystem
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.persistence.query.NoOffset
import akka.persistence.query.Sequence
import akka.persistence.query.DurableStateChange
import akka.persistence.testkit.state.scaladsl.PersistenceTestKitDurableStateStore
import akka.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
import akka.stream.testkit.scaladsl.TestSink
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
object PersistenceTestKitDurableStateStoreSpec {
val config =
PersistenceTestKitDurableStateStorePlugin.config.withFallback(ConfigFactory.parseString("""
akka.loglevel = DEBUG
"""))
case class Record(id: Int, name: String)
}
class PersistenceTestKitDurableStateStoreSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitDurableStateStoreSpec.config)
with LogCapturing
with AnyWordSpecLike {
import PersistenceTestKitDurableStateStoreSpec._
implicit val classic = system.classicSystem
"Persistent test kit state store changes query" must {
"find tagged state changes ordered by upsert" in {
val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem])
val record = Record(1, "name-1")
val recordChange = Record(1, "my-name-1")
val tag = "tag-1"
stateStore.upsertObject("record-1", 1L, record, tag)
val testSink = stateStore.changes(tag, NoOffset).runWith(TestSink[DurableStateChange[Record]]())
val firstStateChange = testSink.request(1).expectNext()
firstStateChange.value should be(record)
firstStateChange.revision should be(1L)
stateStore.upsertObject("record-1", 2L, recordChange, tag)
val secondStateChange = testSink.request(1).expectNext()
secondStateChange.value should be(recordChange)
secondStateChange.revision should be(2L)
secondStateChange.offset
.asInstanceOf[Sequence]
.value should be >= (firstStateChange.offset.asInstanceOf[Sequence].value)
}
"find tagged current state changes ordered by upsert" in {
val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem])
val record = Record(1, "name-1")
val tag = "tag-1"
stateStore.upsertObject("record-1", 1L, record, tag)
val testSinkCurrentChanges =
stateStore.currentChanges(tag, NoOffset).runWith(TestSink[DurableStateChange[Record]]())
stateStore.upsertObject("record-1", 2L, record.copy(name = "my-name-1-2"), tag)
stateStore.upsertObject("record-1", 3L, record.copy(name = "my-name-1-3"), tag)
val currentStateChange = testSinkCurrentChanges.request(1).expectNext()
currentStateChange.value should be(record)
currentStateChange.revision should be(1L)
testSinkCurrentChanges.request(1).expectComplete()
val testSinkIllegalOffset =
stateStore.currentChanges(tag, Sequence(100L)).runWith(TestSink[DurableStateChange[Record]]())
testSinkIllegalOffset.request(1).expectNoMessage()
}
}
}