From 7ef5dd51db08444577461a60bb47cc3a8bed82d5 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Fri, 5 Nov 2021 08:58:58 +0100 Subject: [PATCH] Return revision in PersistenceTestKitDurableStateStore getObject (#30858) --- .../PersistenceTestKitDurableStateStore.scala | 45 ++++++++++--------- ...sistenceTestKitDurableStateStoreSpec.scala | 10 +++++ 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala index f61356f209..8dc66ea98d 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala @@ -43,24 +43,28 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem) private val EarliestOffset = 0L private val lastGlobalOffset = new AtomicLong(EarliestOffset) - def getObject(persistenceId: String): Future[GetObjectResult[A]] = this.synchronized { - Future.successful(GetObjectResult(store.get(persistenceId).map(_.value), 0)) + override def getObject(persistenceId: String): Future[GetObjectResult[A]] = this.synchronized { + Future.successful(store.get(persistenceId) match { + case Some(record) => GetObjectResult(Some(record.value), record.revision) + case None => GetObjectResult(None, 0) + }) } - def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = this.synchronized { - val globalOffset = lastGlobalOffset.incrementAndGet() - val record = Record(globalOffset, persistenceId, revision, value, tag) - store = store + (persistenceId -> record) - publisher ! record - Future.successful(Done) - } + override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = + this.synchronized { + val globalOffset = lastGlobalOffset.incrementAndGet() + val record = Record(globalOffset, persistenceId, revision, value, tag) + store = store + (persistenceId -> record) + publisher ! record + Future.successful(Done) + } - def deleteObject(persistenceId: String): Future[Done] = this.synchronized { + override def deleteObject(persistenceId: String): Future[Done] = this.synchronized { store = store - persistenceId Future.successful(Done) } - def changes(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = this.synchronized { + override def changes(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = this.synchronized { val fromOffset = offset match { case NoOffset => EarliestOffset case Sequence(fromOffset) => fromOffset @@ -86,15 +90,16 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem) .map(_.toDurableStateChange) } - def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = this.synchronized { - val currentGlobalOffset = lastGlobalOffset.get() - changes(tag, offset).takeWhile(_.offset match { - case Sequence(fromOffset) => - fromOffset < currentGlobalOffset - case offset => - throw new UnsupportedOperationException(s"$offset not supported in PersistenceTestKitDurableStateStore.") - }, inclusive = true) - } + override def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = + this.synchronized { + val currentGlobalOffset = lastGlobalOffset.get() + changes(tag, offset).takeWhile(_.offset match { + case Sequence(fromOffset) => + fromOffset < currentGlobalOffset + case offset => + throw new UnsupportedOperationException(s"$offset not supported in PersistenceTestKitDurableStateStore.") + }, inclusive = true) + } override def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] = this.synchronized { diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala index 94160c7b46..856072349b 100644 --- a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala @@ -36,6 +36,16 @@ class PersistenceTestKitDurableStateStoreSpec implicit val classic: akka.actor.ActorSystem = system.classicSystem "Persistent test kit state store" must { + "find individual objects" in { + val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem]) + val record = Record(1, "name-1") + val tag = "tag-1" + val persistenceId = "record-1" + stateStore.upsertObject(persistenceId, 1L, record, tag).futureValue + val updated = stateStore.getObject(persistenceId).futureValue + updated.value should be(Some(record)) + updated.revision should be(1L) + } "find tagged state changes ordered by upsert" in { val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem])