From 353d2ae6d6112017bc7247f865316be22db9705b Mon Sep 17 00:00:00 2001 From: Raymond Roestenburg Date: Thu, 8 Jul 2021 15:20:36 +0200 Subject: [PATCH] =?UTF-8?q?Reading=20until=20last=20global=20offset=20at?= =?UTF-8?q?=20time=20of=20calling=20currentChanges,=20a=E2=80=A6=20(#30372?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Reading until last global offset at time of calling currentChanges, added test. --- .../PersistenceTestKitDurableStateStore.scala | 4 +- ...sistenceTestKitDurableStateStoreSpec.scala | 84 +++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala index 95f04b3a30..59eeb416cb 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala @@ -86,8 +86,10 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem) } def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = this.synchronized { + val currentGlobalOffset = lastGlobalOffset.get() changes(tag, offset).takeWhile(_.offset match { - case Sequence(fromOffset) => fromOffset <= lastGlobalOffset.get() + case Sequence(fromOffset) => + fromOffset <= currentGlobalOffset case offset => throw new UnsupportedOperationException(s"$offset not supported in PersistenceTestKitDurableStateStore.") }) diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala new file mode 100644 index 0000000000..92bc45af04 --- /dev/null +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.persistence.testkit.query + +import akka.actor.ExtendedActorSystem +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.persistence.query.NoOffset +import akka.persistence.query.Sequence +import akka.persistence.query.DurableStateChange +import akka.persistence.testkit.state.scaladsl.PersistenceTestKitDurableStateStore +import akka.persistence.testkit.PersistenceTestKitDurableStateStorePlugin +import akka.stream.testkit.scaladsl.TestSink + +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +object PersistenceTestKitDurableStateStoreSpec { + val config = + PersistenceTestKitDurableStateStorePlugin.config.withFallback(ConfigFactory.parseString(""" + akka.loglevel = DEBUG + """)) + case class Record(id: Int, name: String) +} + +class PersistenceTestKitDurableStateStoreSpec + extends ScalaTestWithActorTestKit(PersistenceTestKitDurableStateStoreSpec.config) + with LogCapturing + with AnyWordSpecLike { + + import PersistenceTestKitDurableStateStoreSpec._ + + implicit val classic = system.classicSystem + + "Persistent test kit state store changes query" must { + + "find tagged state changes ordered by upsert" in { + val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem]) + val record = Record(1, "name-1") + val recordChange = Record(1, "my-name-1") + val tag = "tag-1" + stateStore.upsertObject("record-1", 1L, record, tag) + val testSink = stateStore.changes(tag, NoOffset).runWith(TestSink[DurableStateChange[Record]]()) + + val firstStateChange = testSink.request(1).expectNext() + firstStateChange.value should be(record) + firstStateChange.revision should be(1L) + + stateStore.upsertObject("record-1", 2L, recordChange, tag) + val secondStateChange = testSink.request(1).expectNext() + secondStateChange.value should be(recordChange) + secondStateChange.revision should be(2L) + secondStateChange.offset + .asInstanceOf[Sequence] + .value should be >= (firstStateChange.offset.asInstanceOf[Sequence].value) + } + + "find tagged current state changes ordered by upsert" in { + val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem]) + val record = Record(1, "name-1") + val tag = "tag-1" + + stateStore.upsertObject("record-1", 1L, record, tag) + + val testSinkCurrentChanges = + stateStore.currentChanges(tag, NoOffset).runWith(TestSink[DurableStateChange[Record]]()) + + stateStore.upsertObject("record-1", 2L, record.copy(name = "my-name-1-2"), tag) + stateStore.upsertObject("record-1", 3L, record.copy(name = "my-name-1-3"), tag) + + val currentStateChange = testSinkCurrentChanges.request(1).expectNext() + + currentStateChange.value should be(record) + currentStateChange.revision should be(1L) + testSinkCurrentChanges.request(1).expectComplete() + + val testSinkIllegalOffset = + stateStore.currentChanges(tag, Sequence(100L)).runWith(TestSink[DurableStateChange[Record]]()) + testSinkIllegalOffset.request(1).expectNoMessage() + } + } +}