diff --git a/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceTestKitPolicySampleTest.java b/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceTestKitPolicySampleTest.java index 090540dc56..88cbb30a92 100644 --- a/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceTestKitPolicySampleTest.java +++ b/akka-docs/src/test/java/jdocs/persistence/testkit/PersistenceTestKitPolicySampleTest.java @@ -56,7 +56,7 @@ public class PersistenceTestKitPolicySampleTest extends AbstractJavaTest { static class SampleEventStoragePolicy implements ProcessingPolicy { @Override - public ProcessingResult tryProcess(String persistenceId, JournalOperation processingUnit) { + public ProcessingResult tryProcess(String processId, JournalOperation processingUnit) { if (processingUnit instanceof WriteEvents) { return StorageFailure.create(); } else { diff --git a/akka-docs/src/test/java/jdocs/persistence/testkit/TestKitExamples.java b/akka-docs/src/test/java/jdocs/persistence/testkit/TestKitExamples.java index d090c840d0..2ae7f4e630 100644 --- a/akka-docs/src/test/java/jdocs/persistence/testkit/TestKitExamples.java +++ b/akka-docs/src/test/java/jdocs/persistence/testkit/TestKitExamples.java @@ -29,7 +29,7 @@ public class TestKitExamples { int count = 1; @Override - public ProcessingResult tryProcess(String persistenceId, JournalOperation processingUnit) { + public ProcessingResult tryProcess(String processId, JournalOperation processingUnit) { // check the type of operation and react with success or with reject or with failure. // if you return ProcessingSuccess the operation will be performed, otherwise not. if (count < 10) { @@ -64,7 +64,7 @@ public class TestKitExamples { int count = 1; @Override - public ProcessingResult tryProcess(String persistenceId, SnapshotOperation processingUnit) { + public ProcessingResult tryProcess(String processId, SnapshotOperation processingUnit) { // check the type of operation and react with success or with failure. // if you return ProcessingSuccess the operation will be performed, otherwise not. if (count < 10) { diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala index ec77e83b82..0366e0c4e8 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala @@ -104,6 +104,16 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, Per } } + def tryRead(processId: String, predicate: PersistentRepr => Boolean): immutable.Seq[PersistentRepr] = { + val batch = readAll().filter(predicate).toVector.sortBy(_.timestamp) + + currentPolicy.tryProcess(processId, ReadEvents(batch)) match { + case ProcessingSuccess => batch + case Reject(ex) => throw ex + case StorageFailure(ex) => throw ex + } + } + def tryReadSeqNumber(persistenceId: String): Long = { currentPolicy.tryProcess(persistenceId, ReadSeqNum) match { case ProcessingSuccess => getHighestSeqNumber(persistenceId) diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/ProcessingPolicy.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/ProcessingPolicy.scala index 6ad2b16074..d115f729e6 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/ProcessingPolicy.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/ProcessingPolicy.scala @@ -22,10 +22,11 @@ trait ProcessingPolicy[U] { * If you need this operation to succeed return [[ProcessingSuccess]], * otherwise you should return some of the [[ProcessingFailure]]'s. * + * @param processId persistenceId or other id of the processing operation * @param processingUnit details about current operation to be executed * @return needed result of processing the operation */ - def tryProcess(persistenceId: String, processingUnit: U): ProcessingResult + def tryProcess(processId: String, processingUnit: U): ProcessingResult } diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala index 528764500d..c640fd611d 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala @@ -3,7 +3,9 @@ */ package akka.persistence.testkit.query.javadsl + import akka.NotUsed +import akka.japi.Pair import akka.persistence.query.EventEnvelope import akka.persistence.query.Offset import akka.persistence.query.javadsl.{ @@ -12,6 +14,8 @@ import akka.persistence.query.javadsl.{ EventsByPersistenceIdQuery, ReadJournal } +import akka.persistence.query.typed +import akka.persistence.query.typed.javadsl.CurrentEventsBySliceQuery import akka.stream.javadsl.Source import akka.persistence.testkit.query.scaladsl @@ -23,7 +27,8 @@ final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitR extends ReadJournal with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery - with CurrentEventsByTagQuery { + with CurrentEventsByTagQuery + with CurrentEventsBySliceQuery { override def eventsByPersistenceId( persistenceId: String, @@ -40,4 +45,21 @@ final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitR override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = delegate.currentEventsByTag(tag, offset).asJava + override def currentEventsBySlices[Event]( + entityType: String, + minSlice: Int, + maxSlice: Int, + offset: Offset): Source[typed.EventEnvelope[Event], NotUsed] = + delegate.currentEventsBySlices(entityType, minSlice, maxSlice, offset).asJava + + override def sliceForPersistenceId(persistenceId: String): Int = + delegate.sliceForPersistenceId(persistenceId) + + override def sliceRanges(numberOfRanges: Int): java.util.List[Pair[Integer, Integer]] = { + import akka.util.ccompat.JavaConverters._ + delegate + .sliceRanges(numberOfRanges) + .map(range => Pair(Integer.valueOf(range.min), Integer.valueOf(range.max))) + .asJava + } } diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala index 5fb92520a4..34e6438c25 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala @@ -24,6 +24,11 @@ import akka.util.unused import com.typesafe.config.Config import org.slf4j.LoggerFactory +import akka.persistence.Persistence +import akka.persistence.query.typed +import akka.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery +import akka.persistence.typed.PersistenceId + object PersistenceTestKitReadJournal { val Identifier = "akka.persistence.testkit.query" } @@ -33,6 +38,7 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery with CurrentEventsByTagQuery + with CurrentEventsBySliceQuery with PagedPersistenceIdsQuery { private val log = LoggerFactory.getLogger(getClass) @@ -44,6 +50,8 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c InMemStorageExtension(system).storageFor(storagePluginId) } + private val persistence = Persistence(system) + private def unwrapTaggedPayload(payload: Any): Any = payload match { case Tagged(payload, _) => payload case payload => payload @@ -88,6 +96,41 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c } } + override def currentEventsBySlices[Event]( + entityType: String, + minSlice: Int, + maxSlice: Int, + offset: Offset): Source[typed.EventEnvelope[Event], NotUsed] = { + offset match { + case NoOffset => + case _ => + throw new UnsupportedOperationException("Offsets not supported for persistence test kit currentEventsByTag yet") + } + val prs = storage.tryRead(entityType, repr => { + val pid = repr.persistenceId + val slice = persistence.sliceForPersistenceId(pid) + PersistenceId.extractEntityType(pid) == entityType && slice >= minSlice && slice <= maxSlice + }) + Source(prs).map { pr => + val slice = persistence.sliceForPersistenceId(pr.persistenceId) + new typed.EventEnvelope[Event]( + Sequence(pr.sequenceNr), + pr.persistenceId, + pr.sequenceNr, + Some(pr.payload.asInstanceOf[Event]), + pr.timestamp, + pr.metadata, + entityType, + slice) + } + } + + override def sliceForPersistenceId(persistenceId: String): Int = + persistence.sliceForPersistenceId(persistenceId) + + override def sliceRanges(numberOfRanges: Int): Seq[Range] = + persistence.sliceRanges(numberOfRanges) + /** * Get the current persistence ids. * @@ -102,4 +145,5 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c */ override def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] = storage.currentPersistenceIds(afterId, limit) + } diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/javadsl/PersistenceTestKitDurableStateStore.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/javadsl/PersistenceTestKitDurableStateStore.scala index 4162e17cb5..47e8c31158 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/javadsl/PersistenceTestKitDurableStateStore.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/javadsl/PersistenceTestKitDurableStateStore.scala @@ -9,10 +9,13 @@ import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ + +import akka.japi.Pair import akka.{ Done, NotUsed } import akka.persistence.query.DurableStateChange import akka.persistence.query.Offset import akka.persistence.query.javadsl.{ DurableStateStorePagedPersistenceIdsQuery, DurableStateStoreQuery } +import akka.persistence.query.typed.javadsl.DurableStateStoreBySliceQuery import akka.persistence.state.javadsl.DurableStateUpdateStore import akka.persistence.state.javadsl.GetObjectResult import akka.persistence.testkit.state.scaladsl.{ PersistenceTestKitDurableStateStore => SStore } @@ -25,6 +28,7 @@ object PersistenceTestKitDurableStateStore { class PersistenceTestKitDurableStateStore[A](stateStore: SStore[A]) extends DurableStateUpdateStore[A] with DurableStateStoreQuery[A] + with DurableStateStoreBySliceQuery[A] with DurableStateStorePagedPersistenceIdsQuery[A] { def getObject(persistenceId: String): CompletionStage[GetObjectResult[A]] = @@ -42,6 +46,33 @@ class PersistenceTestKitDurableStateStore[A](stateStore: SStore[A]) def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = { stateStore.currentChanges(tag, offset).asJava } + + override def currentChangesBySlices( + entityType: String, + minSlice: Int, + maxSlice: Int, + offset: Offset): Source[DurableStateChange[A], NotUsed] = + stateStore.currentChangesBySlices(entityType, minSlice, maxSlice, offset).asJava + + override def changesBySlices( + entityType: String, + minSlice: Int, + maxSlice: Int, + offset: Offset): Source[DurableStateChange[A], NotUsed] = + stateStore.changesBySlices(entityType, minSlice, maxSlice, offset).asJava + + override def sliceForPersistenceId(persistenceId: String): Int = + stateStore.sliceForPersistenceId(persistenceId) + + override def sliceRanges(numberOfRanges: Int): java.util.List[Pair[Integer, Integer]] = { + import akka.util.ccompat.JavaConverters._ + stateStore + .sliceRanges(numberOfRanges) + .map(range => Pair(Integer.valueOf(range.min), Integer.valueOf(range.max))) + .asJava + } + override def currentPersistenceIds(afterId: Optional[String], limit: Long): Source[String, NotUsed] = stateStore.currentPersistenceIds(afterId.asScala, limit).asJava + } diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala index 420aa155ba..9321a2a676 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala @@ -7,15 +7,19 @@ package akka.persistence.testkit.state.scaladsl import java.util.concurrent.atomic.AtomicLong import scala.concurrent.Future + import akka.{ Done, NotUsed } import akka.actor.ExtendedActorSystem +import akka.persistence.Persistence import akka.persistence.query.DurableStateChange import akka.persistence.query.scaladsl.{ DurableStateStorePagedPersistenceIdsQuery, DurableStateStoreQuery } import akka.persistence.query.UpdatedDurableState import akka.persistence.query.Offset import akka.persistence.query.NoOffset import akka.persistence.query.Sequence +import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery import akka.persistence.state.scaladsl.{ DurableStateUpdateStore, GetObjectResult } +import akka.persistence.typed.PersistenceId import akka.stream.scaladsl.BroadcastHub import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source @@ -29,9 +33,11 @@ object PersistenceTestKitDurableStateStore { class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem) extends DurableStateUpdateStore[A] with DurableStateStoreQuery[A] + with DurableStateStoreBySliceQuery[A] with DurableStateStorePagedPersistenceIdsQuery[A] { private implicit val sys: ExtendedActorSystem = system + private val persistence = Persistence(system) private var store = Map.empty[String, Record[A]] private val (publisher, changesSource) = @@ -64,6 +70,10 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem) Future.successful(Done) } + private def storeContains(persistenceId: String): Boolean = this.synchronized { + store.contains(persistenceId) + } + override def changes(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = this.synchronized { val fromOffset = offset match { case NoOffset => EarliestOffset @@ -72,11 +82,11 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem) throw new UnsupportedOperationException(s"$offset not supported in PersistenceTestKitDurableStateStore.") } def byTagFromOffset(rec: Record[A]) = rec.tag == tag && rec.globalOffset > fromOffset - def byTagFromOffsetNotDeleted(rec: Record[A]) = byTagFromOffset(rec) && store.contains(rec.persistenceId) + def byTagFromOffsetNotDeleted(rec: Record[A]) = byTagFromOffset(rec) && storeContains(rec.persistenceId) - Source(store.values.toVector.filter(byTagFromOffset _).sortBy(_.globalOffset)) + Source(store.values.toVector.filter(byTagFromOffset).sortBy(_.globalOffset)) .concat(changesSource) - .filter(byTagFromOffsetNotDeleted _) + .filter(byTagFromOffsetNotDeleted) .statefulMapConcat { () => var globalOffsetSeen = EarliestOffset @@ -101,6 +111,62 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem) }, inclusive = true) } + override def currentChangesBySlices( + entityType: String, + minSlice: Int, + maxSlice: Int, + offset: Offset): Source[DurableStateChange[A], NotUsed] = + this.synchronized { + val currentGlobalOffset = lastGlobalOffset.get() + changesBySlices(entityType, minSlice, maxSlice, offset).takeWhile(_.offset match { + case Sequence(fromOffset) => + fromOffset < currentGlobalOffset + case offset => + throw new UnsupportedOperationException(s"$offset not supported in PersistenceTestKitDurableStateStore.") + }, inclusive = true) + } + + override def changesBySlices( + entityType: String, + minSlice: Int, + maxSlice: Int, + offset: Offset): Source[DurableStateChange[A], NotUsed] = + this.synchronized { + val fromOffset = offset match { + case NoOffset => EarliestOffset + case Sequence(fromOffset) => fromOffset + case offset => + throw new UnsupportedOperationException(s"$offset not supported in PersistenceTestKitDurableStateStore.") + } + def bySliceFromOffset(rec: Record[A]) = { + val slice = persistence.sliceForPersistenceId(rec.persistenceId) + PersistenceId.extractEntityType(rec.persistenceId) == entityType && slice >= minSlice && slice <= maxSlice && rec.globalOffset > fromOffset + } + def bySliceFromOffsetNotDeleted(rec: Record[A]) = + bySliceFromOffset(rec) && storeContains(rec.persistenceId) + + Source(store.values.toVector.filter(bySliceFromOffset).sortBy(_.globalOffset)) + .concat(changesSource) + .filter(bySliceFromOffsetNotDeleted) + .statefulMapConcat { () => + var globalOffsetSeen = EarliestOffset + + { (record: Record[A]) => + if (record.globalOffset > globalOffsetSeen) { + globalOffsetSeen = record.globalOffset + record :: Nil + } else Nil + } + } + .map(_.toDurableStateChange) + } + + override def sliceForPersistenceId(persistenceId: String): Int = + persistence.sliceForPersistenceId(persistenceId) + + override def sliceRanges(numberOfRanges: Int): Seq[Range] = + persistence.sliceRanges(numberOfRanges) + override def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] = this.synchronized { if (limit < 1) { @@ -115,6 +181,7 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem) // Enforce limit in Akka Streams so that we can pass long values to take as is. Source(keys).take(limit) } + } private final case class Record[A]( diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/CurrentEventsBySlicesSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/CurrentEventsBySlicesSpec.scala new file mode 100644 index 0000000000..a70e22e95a --- /dev/null +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/CurrentEventsBySlicesSpec.scala @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2020-2021 Lightbend Inc. + */ + +package akka.persistence.testkit.query + +import org.scalatest.wordspec.AnyWordSpecLike + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.persistence.Persistence +import akka.persistence.query.NoOffset +import akka.persistence.query.PersistenceQuery +import akka.persistence.testkit.query.EventsByPersistenceIdSpec.Command +import akka.persistence.testkit.query.EventsByPersistenceIdSpec.testBehaviour +import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.stream.scaladsl.Sink + +class CurrentEventsBySlicesSpec + extends ScalaTestWithActorTestKit(EventsByPersistenceIdSpec.config) + with LogCapturing + with AnyWordSpecLike { + + implicit val classic: akka.actor.ActorSystem = system.classicSystem + + val queries = + PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier) + + def setup(persistenceId: String): ActorRef[Command] = { + val probe = createTestProbe[Done]() + val ref = spawn(testBehaviour(persistenceId)) + ref ! Command(s"$persistenceId-1", probe.ref) + ref ! Command(s"$persistenceId-2", probe.ref) + ref ! Command(s"$persistenceId-3", probe.ref) + probe.expectMessage(Done) + probe.expectMessage(Done) + probe.expectMessage(Done) + ref + } + + "Persistent test kit currentEventsByTag query" must { + + "find eventsBySlices ordered by insert time" in { + val probe = createTestProbe[Done]() + val ref1 = spawn(testBehaviour("Test|pid-1")) + val ref2 = spawn(testBehaviour("Test|pid-2")) + ref1 ! Command("evt-1", probe.ref) + ref1 ! Command("evt-2", probe.ref) + ref1 ! Command("evt-3", probe.ref) + probe.receiveMessages(3) + ref2 ! Command("evt-4", probe.ref) + probe.receiveMessage() + ref1 ! Command("evt-5", probe.ref) + probe.receiveMessage() + + queries + .currentEventsBySlices[String]("Test", 0, Persistence(system).numberOfSlices - 1, NoOffset) + .runWith(Sink.seq) + .futureValue + .map(_.event) should ===(Seq("evt-1", "evt-2", "evt-3", "evt-4", "evt-5")) + } + } + +} diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/CurrentEventsByTagSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/CurrentEventsByTagSpec.scala index bea534df52..73ee78e9db 100644 --- a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/CurrentEventsByTagSpec.scala +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/CurrentEventsByTagSpec.scala @@ -23,21 +23,9 @@ class CurrentEventsByTagSpec implicit val classic: akka.actor.ActorSystem = system.classicSystem - val queries = + private val queries = PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier) - def setup(persistenceId: String): ActorRef[Command] = { - val probe = createTestProbe[Done]() - val ref = setupEmpty(persistenceId) - ref ! Command(s"$persistenceId-1", probe.ref) - ref ! Command(s"$persistenceId-2", probe.ref) - ref ! Command(s"$persistenceId-3", probe.ref) - probe.expectMessage(Done) - probe.expectMessage(Done) - probe.expectMessage(Done) - ref - } - def setupEmpty(persistenceId: String): ActorRef[Command] = { spawn( testBehaviour(persistenceId).withTagger(evt => diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala index 856072349b..1349b7be45 100644 --- a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala @@ -2,7 +2,7 @@ * Copyright (C) 2021 Lightbend Inc. */ -package akka.persistence.testkit.query +package akka.persistence.testkit.state.scaladsl import akka.actor.ExtendedActorSystem import akka.actor.testkit.typed.scaladsl.LogCapturing @@ -121,6 +121,81 @@ class PersistenceTestKitDurableStateStoreSpec testSinkCurrentChanges.request(1).expectComplete() } + "find state changes by slice ordered by upsert" in { + val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem]) + val record = Record(1, "name-1") + val recordChange = Record(1, "my-name-1") + stateStore.upsertObject("Test|record-1", 1L, record, "") + val maxSlice = stateStore.sliceRanges(1).head.max + val testSink = stateStore + .changesBySlices("Test", 0, maxSlice, NoOffset) + .collect { case u: UpdatedDurableState[Record] => u } + .runWith(TestSink[UpdatedDurableState[Record]]()) + + val firstStateChange = testSink.request(1).expectNext() + firstStateChange.value should be(record) + firstStateChange.revision should be(1L) + + stateStore.upsertObject("Test|record-1", 2L, recordChange, "") + val secondStateChange = testSink.request(1).expectNext() + secondStateChange.value should be(recordChange) + secondStateChange.revision should be(2L) + secondStateChange.offset + .asInstanceOf[Sequence] + .value should be >= (firstStateChange.offset.asInstanceOf[Sequence].value) + } + + "find current state changes by slice ordered by upsert" in { + val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem]) + val record = Record(1, "name-1") + + stateStore.upsertObject("Test|record-1", 1L, record, "") + + val maxSlice = stateStore.sliceRanges(1).head.max + val testSinkCurrentChanges = + stateStore + .currentChangesBySlices("Test", 0, maxSlice, NoOffset) + .collect { case u: UpdatedDurableState[Record] => u } + .runWith(TestSink[UpdatedDurableState[Record]]()) + + stateStore.upsertObject("record-1", 2L, record.copy(name = "my-name-1-2"), "") + stateStore.upsertObject("record-1", 3L, record.copy(name = "my-name-1-3"), "") + + val currentStateChange = testSinkCurrentChanges.request(1).expectNext() + + currentStateChange.value should be(record) + currentStateChange.revision should be(1L) + testSinkCurrentChanges.request(1).expectComplete() + + val testSinkIllegalOffset = + stateStore + .currentChangesBySlices("Test", 0, maxSlice, Sequence(100L)) + .collect { case u: UpdatedDurableState[Record] => u } + .runWith(TestSink[UpdatedDurableState[Record]]()) + testSinkIllegalOffset.request(1).expectNoMessage() + } + + "return current changes by slice when there are no further changes" in { + val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem]) + val record = Record(1, "name-1") + val tag = "tag-1" + + stateStore.upsertObject("Test|record-1", 1L, record, tag) + + val maxSlice = stateStore.sliceRanges(1).head.max + val testSinkCurrentChanges = + stateStore + .currentChangesBySlices("Test", 0, maxSlice, NoOffset) + .collect { case u: UpdatedDurableState[Record] => u } + .runWith(TestSink[UpdatedDurableState[Record]]()) + + val currentStateChange = testSinkCurrentChanges.request(1).expectNext() + + currentStateChange.value should be(record) + currentStateChange.revision should be(1L) + testSinkCurrentChanges.request(1).expectComplete() + } + "return all current persistence ids" in { val stateStore = new PersistenceTestKitDurableStateStore[Record](classic.asInstanceOf[ExtendedActorSystem])