Fixed testkit durable state store current changes non completion (#30819)

PersistenceTestKitDurableStateStore.currentChanges was correctly only
returning the current changes, however it was not completing until an
addition change was made. This fixes that.
This commit is contained in:
James Roper 2021-10-25 18:18:53 +11:00 committed by GitHub
parent cdad7938b0
commit a581e86deb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 2 deletions

View file

@ -90,10 +90,10 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem)
val currentGlobalOffset = lastGlobalOffset.get()
changes(tag, offset).takeWhile(_.offset match {
case Sequence(fromOffset) =>
fromOffset <= currentGlobalOffset
fromOffset < currentGlobalOffset
case offset =>
throw new UnsupportedOperationException(s"$offset not supported in PersistenceTestKitDurableStateStore.")
})
}, inclusive = true)
}
}

View file

@ -89,5 +89,26 @@ class PersistenceTestKitDurableStateStoreSpec
.runWith(TestSink[UpdatedDurableState[Record]]())
testSinkIllegalOffset.request(1).expectNoMessage()
}
"return current changes when there are no further changes" in {
val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem])
val record = Record(1, "name-1")
val tag = "tag-1"
stateStore.upsertObject("record-1", 1L, record, tag)
val testSinkCurrentChanges =
stateStore
.currentChanges(tag, NoOffset)
.collect { case u: UpdatedDurableState[Record] => u }
.runWith(TestSink[UpdatedDurableState[Record]]())
val currentStateChange = testSinkCurrentChanges.request(1).expectNext()
currentStateChange.value should be(record)
currentStateChange.revision should be(1L)
testSinkCurrentChanges.request(1).expectComplete()
}
}
}