From a581e86debc8b4ccc8f0a083e06be00b1eca5600 Mon Sep 17 00:00:00 2001 From: James Roper Date: Mon, 25 Oct 2021 18:18:53 +1100 Subject: [PATCH] Fixed testkit durable state store current changes non completion (#30819) PersistenceTestKitDurableStateStore.currentChanges was correctly only returning the current changes, however it was not completing until an addition change was made. This fixes that. --- .../PersistenceTestKitDurableStateStore.scala | 4 ++-- ...sistenceTestKitDurableStateStoreSpec.scala | 21 +++++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala index cc48978f8a..7ffa9f5e1f 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala @@ -90,10 +90,10 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem) val currentGlobalOffset = lastGlobalOffset.get() changes(tag, offset).takeWhile(_.offset match { case Sequence(fromOffset) => - fromOffset <= currentGlobalOffset + fromOffset < currentGlobalOffset case offset => throw new UnsupportedOperationException(s"$offset not supported in PersistenceTestKitDurableStateStore.") - }) + }, inclusive = true) } } diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala index 810845ef10..ab70fa17dc 100644 --- a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala @@ -89,5 +89,26 @@ class PersistenceTestKitDurableStateStoreSpec .runWith(TestSink[UpdatedDurableState[Record]]()) testSinkIllegalOffset.request(1).expectNoMessage() } + + "return current changes when there are no further changes" in { + val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem]) + val record = Record(1, "name-1") + val tag = "tag-1" + + stateStore.upsertObject("record-1", 1L, record, tag) + + val testSinkCurrentChanges = + stateStore + .currentChanges(tag, NoOffset) + .collect { case u: UpdatedDurableState[Record] => u } + .runWith(TestSink[UpdatedDurableState[Record]]()) + + val currentStateChange = testSinkCurrentChanges.request(1).expectNext() + + currentStateChange.value should be(record) + currentStateChange.revision should be(1L) + testSinkCurrentChanges.request(1).expectComplete() + } + } }