AST for DurableStateChange to support deletes (#30549)

* UpdatedDurableState and future DeletedDurableState
This commit is contained in:
Patrik Nordwall 2021-08-19 08:46:37 +02:00 committed by GitHub
parent 9f83f437f9
commit 835c9b85f0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 104 additions and 20 deletions

View file

@ -6,15 +6,20 @@ package jdocs.akka.cluster.sharding.typed;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;
import akka.persistence.query.DurableStateChange;
import akka.persistence.query.Offset;
import akka.persistence.query.javadsl.DurableStateStoreQuery;
import akka.stream.javadsl.Source;
// #get-durable-state-store-query-example
import akka.persistence.state.DurableStateStoreRegistry;
import akka.persistence.state.javadsl.*;
import akka.persistence.query.javadsl.DurableStateStoreQuery;
import akka.persistence.query.DurableStateChange;
import akka.persistence.query.UpdatedDurableState;
// #get-durable-state-store-query-example
class DurableStateStoreQueryUsageCompileOnlySpec {
@SuppressWarnings("unchecked")
public <Record> DurableStateStoreQuery<Record> getQuery(
ActorSystem system, String pluginId, Offset offset) {
// #get-durable-state-store-query-example
@ -23,6 +28,15 @@ class DurableStateStoreQueryUsageCompileOnlySpec {
.getDurableStateStoreFor(DurableStateStoreQuery.class, pluginId);
Source<DurableStateChange<Record>, NotUsed> source =
durableStateStoreQuery.changes("tag", offset);
source.map(
chg -> {
if (chg instanceof UpdatedDurableState) {
UpdatedDurableState<Record> upd = (UpdatedDurableState<Record>) chg;
return upd.value();
} else {
throw new IllegalArgumentException("Unexpected DurableStateChange " + chg.getClass());
}
});
// #get-durable-state-store-query-example
return durableStateStoreQuery;
}

View file

@ -5,21 +5,27 @@
package docs.akka.cluster.sharding.typed
import scala.annotation.nowarn
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.actor.ActorSystem
import akka.persistence.query.DurableStateChange
import akka.persistence.query.Offset
import akka.persistence.query.scaladsl.DurableStateStoreQuery
import akka.persistence.state.DurableStateStoreRegistry
import akka.stream.scaladsl.Source
@nowarn
object DurableStateStoreQueryUsageCompileOnlySpec {
def getQuery[Record](system: ActorSystem, pluginId: String, offset: Offset) = {
//#get-durable-state-store-query-example
import akka.persistence.state.DurableStateStoreRegistry
import akka.persistence.query.scaladsl.DurableStateStoreQuery
import akka.persistence.query.DurableStateChange
import akka.persistence.query.UpdatedDurableState
val durableStateStoreQuery =
DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[Record]](pluginId)
val source: Source[DurableStateChange[Record], NotUsed] = durableStateStoreQuery.changes("tag", offset)
source.map {
case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => value
}
//#get-durable-state-store-query-example
}
}

View file

@ -45,3 +45,5 @@ Scala
Java
: @@snip [DurableStateStoreQueryUsageCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlyTest.java) { #get-durable-state-store-query-example }
The @apidoc[DurableStateChange] elements can be `UpdatedDurableState` or `DeletedDurableState`.
`DeletedDurableState` is not implemented yet.

View file

@ -5,12 +5,49 @@
package akka.persistence.query
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
/**
* API May Change
*
* The `DurableStateStoreQuery` stream elements for `DurableStateStoreQuery`.
*
* The implementation can be a [[UpdatedDurableState]] or a `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446
*
* Not for user extension
*
* @tparam A the type of the value
*/
@DoNotInherit
sealed trait DurableStateChange[A] {
/**
* The persistence id of the origin entity.
*/
def persistenceId: String
/**
* The offset that can be used in next `changes` or `currentChanges` query.
*/
def offset: Offset
}
object UpdatedDurableState {
/**
* API May Change
*/
@ApiMayChange
def unapply[A](arg: UpdatedDurableState[A]): Option[(String, Long, A, Offset, Long)] =
Some((arg.persistenceId, arg.revision, arg.value, arg.offset, arg.timestamp))
}
/**
* API May Change
*
* @param persistenceId The persistence id of the origin entity.
* @param seqNr The sequence number from the origin entity.
* @param revision The revision number from the origin entity.
* @param value The object value.
* @param offset The offset that can be used in next `changes` or `currentChanges` query.
* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
@ -18,9 +55,10 @@ import akka.annotation.ApiMayChange
* @tparam A the type of the value
*/
@ApiMayChange
final class DurableStateChange[A](
final class UpdatedDurableState[A](
val persistenceId: String,
val revision: Long,
val value: A,
val offset: Offset,
override val offset: Offset,
val timestamp: Long)
extends DurableStateChange[A]

View file

@ -30,6 +30,9 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
* changes since the beginning of time, or an offset that has been previously returned by this query.
@ -49,6 +52,9 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* in quick succession are likely to be skipped, with only the last update resulting in a change from this
* source.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
* changes since the beginning of time, or an offset that has been previously returned by this query.

View file

@ -6,8 +6,8 @@ package akka.persistence.query.scaladsl
import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.persistence.state.scaladsl.DurableStateStore
import akka.persistence.query.DurableStateChange
import akka.persistence.state.scaladsl.DurableStateStore
import akka.persistence.query.Offset
import akka.stream.scaladsl.Source
@ -30,6 +30,9 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
* changes since the beginning of time, or an offset that has been previously returned by this query.
@ -49,6 +52,9 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* in quick succession are likely to be skipped, with only the last update resulting in a change from this
* source.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
* changes since the beginning of time, or an offset that has been previously returned by this query.

View file

@ -7,11 +7,13 @@ package akka.persistence.testkit.state.javadsl
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
import akka.Done
import akka.persistence.query.javadsl.DurableStateStoreQuery
import akka.persistence.query.DurableStateChange
import akka.persistence.query.Offset
import akka.persistence.state.javadsl.{ DurableStateUpdateStore, GetObjectResult }
import akka.persistence.query.javadsl.DurableStateStoreQuery
import akka.persistence.state.javadsl.DurableStateUpdateStore
import akka.persistence.state.javadsl.GetObjectResult
import akka.persistence.testkit.state.scaladsl.{ PersistenceTestKitDurableStateStore => SStore }
import akka.stream.javadsl.Source

View file

@ -10,8 +10,9 @@ import scala.concurrent.Future
import akka.Done
import akka.actor.ExtendedActorSystem
import akka.persistence.query.scaladsl.DurableStateStoreQuery
import akka.persistence.query.DurableStateChange
import akka.persistence.query.scaladsl.DurableStateStoreQuery
import akka.persistence.query.UpdatedDurableState
import akka.persistence.query.Offset
import akka.persistence.query.NoOffset
import akka.persistence.query.Sequence
@ -104,5 +105,5 @@ private final case class Record[A](
tag: String,
timestamp: Long = System.currentTimeMillis) {
def toDurableStateChange: DurableStateChange[A] =
new DurableStateChange(persistenceId, revision, value, Sequence(globalOffset), timestamp)
new UpdatedDurableState(persistenceId, revision, value, Sequence(globalOffset), timestamp)
}

View file

@ -9,7 +9,7 @@ import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.persistence.query.NoOffset
import akka.persistence.query.Sequence
import akka.persistence.query.DurableStateChange
import akka.persistence.query.UpdatedDurableState
import akka.persistence.testkit.state.scaladsl.PersistenceTestKitDurableStateStore
import akka.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
import akka.stream.testkit.scaladsl.TestSink
@ -42,7 +42,10 @@ class PersistenceTestKitDurableStateStoreSpec
val recordChange = Record(1, "my-name-1")
val tag = "tag-1"
stateStore.upsertObject("record-1", 1L, record, tag)
val testSink = stateStore.changes(tag, NoOffset).runWith(TestSink[DurableStateChange[Record]]())
val testSink = stateStore
.changes(tag, NoOffset)
.collect { case u: UpdatedDurableState[Record] => u }
.runWith(TestSink[UpdatedDurableState[Record]]())
val firstStateChange = testSink.request(1).expectNext()
firstStateChange.value should be(record)
@ -65,7 +68,10 @@ class PersistenceTestKitDurableStateStoreSpec
stateStore.upsertObject("record-1", 1L, record, tag)
val testSinkCurrentChanges =
stateStore.currentChanges(tag, NoOffset).runWith(TestSink[DurableStateChange[Record]]())
stateStore
.currentChanges(tag, NoOffset)
.collect { case u: UpdatedDurableState[Record] => u }
.runWith(TestSink[UpdatedDurableState[Record]]())
stateStore.upsertObject("record-1", 2L, record.copy(name = "my-name-1-2"), tag)
stateStore.upsertObject("record-1", 3L, record.copy(name = "my-name-1-3"), tag)
@ -77,7 +83,10 @@ class PersistenceTestKitDurableStateStoreSpec
testSinkCurrentChanges.request(1).expectComplete()
val testSinkIllegalOffset =
stateStore.currentChanges(tag, Sequence(100L)).runWith(TestSink[DurableStateChange[Record]]())
stateStore
.currentChanges(tag, Sequence(100L))
.collect { case u: UpdatedDurableState[Record] => u }
.runWith(TestSink[UpdatedDurableState[Record]]())
testSinkIllegalOffset.request(1).expectNoMessage()
}
}