Return revision in PersistenceTestKitDurableStateStore getObject (#30858)

This commit is contained in:
Arnout Engelen 2021-11-05 08:58:58 +01:00 committed by GitHub
parent a518a96c6a
commit 7ef5dd51db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 20 deletions

View file

@ -43,24 +43,28 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem)
private val EarliestOffset = 0L
private val lastGlobalOffset = new AtomicLong(EarliestOffset)
def getObject(persistenceId: String): Future[GetObjectResult[A]] = this.synchronized {
Future.successful(GetObjectResult(store.get(persistenceId).map(_.value), 0))
override def getObject(persistenceId: String): Future[GetObjectResult[A]] = this.synchronized {
Future.successful(store.get(persistenceId) match {
case Some(record) => GetObjectResult(Some(record.value), record.revision)
case None => GetObjectResult(None, 0)
})
}
def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = this.synchronized {
val globalOffset = lastGlobalOffset.incrementAndGet()
val record = Record(globalOffset, persistenceId, revision, value, tag)
store = store + (persistenceId -> record)
publisher ! record
Future.successful(Done)
}
override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] =
this.synchronized {
val globalOffset = lastGlobalOffset.incrementAndGet()
val record = Record(globalOffset, persistenceId, revision, value, tag)
store = store + (persistenceId -> record)
publisher ! record
Future.successful(Done)
}
def deleteObject(persistenceId: String): Future[Done] = this.synchronized {
override def deleteObject(persistenceId: String): Future[Done] = this.synchronized {
store = store - persistenceId
Future.successful(Done)
}
def changes(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = this.synchronized {
override def changes(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = this.synchronized {
val fromOffset = offset match {
case NoOffset => EarliestOffset
case Sequence(fromOffset) => fromOffset
@ -86,15 +90,16 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem)
.map(_.toDurableStateChange)
}
def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = this.synchronized {
val currentGlobalOffset = lastGlobalOffset.get()
changes(tag, offset).takeWhile(_.offset match {
case Sequence(fromOffset) =>
fromOffset < currentGlobalOffset
case offset =>
throw new UnsupportedOperationException(s"$offset not supported in PersistenceTestKitDurableStateStore.")
}, inclusive = true)
}
override def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] =
this.synchronized {
val currentGlobalOffset = lastGlobalOffset.get()
changes(tag, offset).takeWhile(_.offset match {
case Sequence(fromOffset) =>
fromOffset < currentGlobalOffset
case offset =>
throw new UnsupportedOperationException(s"$offset not supported in PersistenceTestKitDurableStateStore.")
}, inclusive = true)
}
override def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] =
this.synchronized {

View file

@ -36,6 +36,16 @@ class PersistenceTestKitDurableStateStoreSpec
implicit val classic: akka.actor.ActorSystem = system.classicSystem
"Persistent test kit state store" must {
"find individual objects" in {
val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem])
val record = Record(1, "name-1")
val tag = "tag-1"
val persistenceId = "record-1"
stateStore.upsertObject(persistenceId, 1L, record, tag).futureValue
val updated = stateStore.getObject(persistenceId).futureValue
updated.value should be(Some(record))
updated.revision should be(1L)
}
"find tagged state changes ordered by upsert" in {
val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem])