diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlyTest.java index 4b2eab2fce..64a528d92a 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlyTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlyTest.java @@ -6,15 +6,20 @@ package jdocs.akka.cluster.sharding.typed; import akka.NotUsed; import akka.actor.ActorSystem; -import akka.stream.javadsl.Source; -import akka.persistence.query.DurableStateChange; import akka.persistence.query.Offset; -import akka.persistence.query.javadsl.DurableStateStoreQuery; +import akka.stream.javadsl.Source; + +// #get-durable-state-store-query-example import akka.persistence.state.DurableStateStoreRegistry; -import akka.persistence.state.javadsl.*; +import akka.persistence.query.javadsl.DurableStateStoreQuery; +import akka.persistence.query.DurableStateChange; +import akka.persistence.query.UpdatedDurableState; + +// #get-durable-state-store-query-example class DurableStateStoreQueryUsageCompileOnlySpec { + @SuppressWarnings("unchecked") public DurableStateStoreQuery getQuery( ActorSystem system, String pluginId, Offset offset) { // #get-durable-state-store-query-example @@ -23,6 +28,15 @@ class DurableStateStoreQueryUsageCompileOnlySpec { .getDurableStateStoreFor(DurableStateStoreQuery.class, pluginId); Source, NotUsed> source = durableStateStoreQuery.changes("tag", offset); + source.map( + chg -> { + if (chg instanceof UpdatedDurableState) { + UpdatedDurableState upd = (UpdatedDurableState) chg; + return upd.value(); + } else { + throw new IllegalArgumentException("Unexpected DurableStateChange " + chg.getClass()); + } + }); // #get-durable-state-store-query-example return durableStateStoreQuery; } diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlySpec.scala index 405138c5d8..c89a9a5a13 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlySpec.scala @@ -5,21 +5,27 @@ package docs.akka.cluster.sharding.typed import scala.annotation.nowarn + import akka.NotUsed -import akka.stream.scaladsl.Source import akka.actor.ActorSystem -import akka.persistence.query.DurableStateChange import akka.persistence.query.Offset -import akka.persistence.query.scaladsl.DurableStateStoreQuery -import akka.persistence.state.DurableStateStoreRegistry +import akka.stream.scaladsl.Source @nowarn object DurableStateStoreQueryUsageCompileOnlySpec { def getQuery[Record](system: ActorSystem, pluginId: String, offset: Offset) = { //#get-durable-state-store-query-example + import akka.persistence.state.DurableStateStoreRegistry + import akka.persistence.query.scaladsl.DurableStateStoreQuery + import akka.persistence.query.DurableStateChange + import akka.persistence.query.UpdatedDurableState + val durableStateStoreQuery = DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[Record]](pluginId) val source: Source[DurableStateChange[Record], NotUsed] = durableStateStoreQuery.changes("tag", offset) + source.map { + case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => value + } //#get-durable-state-store-query-example } } diff --git a/akka-docs/src/main/paradox/durable-state/persistence-query.md b/akka-docs/src/main/paradox/durable-state/persistence-query.md index 88cb5e268b..a31c953d60 100644 --- a/akka-docs/src/main/paradox/durable-state/persistence-query.md +++ b/akka-docs/src/main/paradox/durable-state/persistence-query.md @@ -45,3 +45,5 @@ Scala Java : @@snip [DurableStateStoreQueryUsageCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlyTest.java) { #get-durable-state-store-query-example } +The @apidoc[DurableStateChange] elements can be `UpdatedDurableState` or `DeletedDurableState`. +`DeletedDurableState` is not implemented yet. diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/DurableStateChange.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/DurableStateChange.scala index 4ad6dc6db6..684f662c95 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/DurableStateChange.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/DurableStateChange.scala @@ -5,12 +5,49 @@ package akka.persistence.query import akka.annotation.ApiMayChange +import akka.annotation.DoNotInherit + +/** + * API May Change + * + * The `DurableStateStoreQuery` stream elements for `DurableStateStoreQuery`. + * + * The implementation can be a [[UpdatedDurableState]] or a `DeletedDurableState`. + * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446 + * + * Not for user extension + * + * @tparam A the type of the value + */ +@DoNotInherit +sealed trait DurableStateChange[A] { + + /** + * The persistence id of the origin entity. + */ + def persistenceId: String + + /** + * The offset that can be used in next `changes` or `currentChanges` query. + */ + def offset: Offset +} + +object UpdatedDurableState { + + /** + * API May Change + */ + @ApiMayChange + def unapply[A](arg: UpdatedDurableState[A]): Option[(String, Long, A, Offset, Long)] = + Some((arg.persistenceId, arg.revision, arg.value, arg.offset, arg.timestamp)) +} /** * API May Change * * @param persistenceId The persistence id of the origin entity. - * @param seqNr The sequence number from the origin entity. + * @param revision The revision number from the origin entity. * @param value The object value. * @param offset The offset that can be used in next `changes` or `currentChanges` query. * @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC @@ -18,9 +55,10 @@ import akka.annotation.ApiMayChange * @tparam A the type of the value */ @ApiMayChange -final class DurableStateChange[A]( +final class UpdatedDurableState[A]( val persistenceId: String, val revision: Long, val value: A, - val offset: Offset, + override val offset: Offset, val timestamp: Long) + extends DurableStateChange[A] diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/DurableStateStoreQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/DurableStateStoreQuery.scala index e0f1bdaa99..fe48af5358 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/DurableStateStoreQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/DurableStateStoreQuery.scala @@ -30,6 +30,9 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] { * This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to * objects made since materialization are not guaranteed to be included in the results. * + * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. + * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. + * * @param tag The tag to get changes for. * @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get * changes since the beginning of time, or an offset that has been previously returned by this query. @@ -49,6 +52,9 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] { * in quick succession are likely to be skipped, with only the last update resulting in a change from this * source. * + * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. + * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. + * * @param tag The tag to get changes for. * @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get * changes since the beginning of time, or an offset that has been previously returned by this query. diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/DurableStateStoreQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/DurableStateStoreQuery.scala index de5150f3f5..813a36d78e 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/DurableStateStoreQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/DurableStateStoreQuery.scala @@ -6,8 +6,8 @@ package akka.persistence.query.scaladsl import akka.NotUsed import akka.annotation.ApiMayChange -import akka.persistence.state.scaladsl.DurableStateStore import akka.persistence.query.DurableStateChange +import akka.persistence.state.scaladsl.DurableStateStore import akka.persistence.query.Offset import akka.stream.scaladsl.Source @@ -30,6 +30,9 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] { * This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to * objects made since materialization are not guaranteed to be included in the results. * + * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. + * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. + * * @param tag The tag to get changes for. * @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get * changes since the beginning of time, or an offset that has been previously returned by this query. @@ -49,6 +52,9 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] { * in quick succession are likely to be skipped, with only the last update resulting in a change from this * source. * + * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. + * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. + * * @param tag The tag to get changes for. * @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get * changes since the beginning of time, or an offset that has been previously returned by this query. diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/javadsl/PersistenceTestKitDurableStateStore.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/javadsl/PersistenceTestKitDurableStateStore.scala index 5d82d06046..c73d1bc90a 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/javadsl/PersistenceTestKitDurableStateStore.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/javadsl/PersistenceTestKitDurableStateStore.scala @@ -7,11 +7,13 @@ package akka.persistence.testkit.state.javadsl import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._ + import akka.Done -import akka.persistence.query.javadsl.DurableStateStoreQuery import akka.persistence.query.DurableStateChange import akka.persistence.query.Offset -import akka.persistence.state.javadsl.{ DurableStateUpdateStore, GetObjectResult } +import akka.persistence.query.javadsl.DurableStateStoreQuery +import akka.persistence.state.javadsl.DurableStateUpdateStore +import akka.persistence.state.javadsl.GetObjectResult import akka.persistence.testkit.state.scaladsl.{ PersistenceTestKitDurableStateStore => SStore } import akka.stream.javadsl.Source diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala index 59eeb416cb..b3be044944 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala @@ -10,8 +10,9 @@ import scala.concurrent.Future import akka.Done import akka.actor.ExtendedActorSystem -import akka.persistence.query.scaladsl.DurableStateStoreQuery import akka.persistence.query.DurableStateChange +import akka.persistence.query.scaladsl.DurableStateStoreQuery +import akka.persistence.query.UpdatedDurableState import akka.persistence.query.Offset import akka.persistence.query.NoOffset import akka.persistence.query.Sequence @@ -104,5 +105,5 @@ private final case class Record[A]( tag: String, timestamp: Long = System.currentTimeMillis) { def toDurableStateChange: DurableStateChange[A] = - new DurableStateChange(persistenceId, revision, value, Sequence(globalOffset), timestamp) + new UpdatedDurableState(persistenceId, revision, value, Sequence(globalOffset), timestamp) } diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala index 92bc45af04..b975ad5ed5 100644 --- a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreSpec.scala @@ -9,7 +9,7 @@ import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.persistence.query.NoOffset import akka.persistence.query.Sequence -import akka.persistence.query.DurableStateChange +import akka.persistence.query.UpdatedDurableState import akka.persistence.testkit.state.scaladsl.PersistenceTestKitDurableStateStore import akka.persistence.testkit.PersistenceTestKitDurableStateStorePlugin import akka.stream.testkit.scaladsl.TestSink @@ -42,7 +42,10 @@ class PersistenceTestKitDurableStateStoreSpec val recordChange = Record(1, "my-name-1") val tag = "tag-1" stateStore.upsertObject("record-1", 1L, record, tag) - val testSink = stateStore.changes(tag, NoOffset).runWith(TestSink[DurableStateChange[Record]]()) + val testSink = stateStore + .changes(tag, NoOffset) + .collect { case u: UpdatedDurableState[Record] => u } + .runWith(TestSink[UpdatedDurableState[Record]]()) val firstStateChange = testSink.request(1).expectNext() firstStateChange.value should be(record) @@ -65,7 +68,10 @@ class PersistenceTestKitDurableStateStoreSpec stateStore.upsertObject("record-1", 1L, record, tag) val testSinkCurrentChanges = - stateStore.currentChanges(tag, NoOffset).runWith(TestSink[DurableStateChange[Record]]()) + stateStore + .currentChanges(tag, NoOffset) + .collect { case u: UpdatedDurableState[Record] => u } + .runWith(TestSink[UpdatedDurableState[Record]]()) stateStore.upsertObject("record-1", 2L, record.copy(name = "my-name-1-2"), tag) stateStore.upsertObject("record-1", 3L, record.copy(name = "my-name-1-3"), tag) @@ -77,7 +83,10 @@ class PersistenceTestKitDurableStateStoreSpec testSinkCurrentChanges.request(1).expectComplete() val testSinkIllegalOffset = - stateStore.currentChanges(tag, Sequence(100L)).runWith(TestSink[DurableStateChange[Record]]()) + stateStore + .currentChanges(tag, Sequence(100L)) + .collect { case u: UpdatedDurableState[Record] => u } + .runWith(TestSink[UpdatedDurableState[Record]]()) testSinkIllegalOffset.request(1).expectNoMessage() } }