Delete effect in durable state, #30446 (#31529)

* Add reset effect to the state dsl.
* The effect calls deleteObject in the durable state store.
* The effect updates state to the empty state.
* Implement DeletedDurableState for persistence query.
* Update PersistenceTestKitDurableStateStore so that deleteObject sets the Record payload to None, ie delete the payload.
* update documentation for delete effect
* increment the revision by one when deleting state
* Overload deleteObject with revision and deprecate deleteObject.
* add bin-comp exclude

(cherry picked from commit 34a621a7cd)
This commit is contained in:
Joseph Ausmann 2022-09-02 08:56:35 +02:00 committed by Patrik Nordwall
parent 1414566930
commit 66afe3fefb
21 changed files with 147 additions and 44 deletions

View file

@ -5,10 +5,9 @@
package docs.akka.cluster.sharding.typed package docs.akka.cluster.sharding.typed
import scala.annotation.nowarn import scala.annotation.nowarn
import akka.NotUsed import akka.NotUsed
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.persistence.query.Offset import akka.persistence.query.{ DeletedDurableState, Offset }
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
@nowarn @nowarn
@ -24,7 +23,8 @@ object DurableStateStoreQueryUsageCompileOnlySpec {
DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[Record]](pluginId) DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[Record]](pluginId)
val source: Source[DurableStateChange[Record], NotUsed] = durableStateStoreQuery.changes("tag", offset) val source: Source[DurableStateChange[Record], NotUsed] = durableStateStoreQuery.changes("tag", offset)
source.map { source.map {
case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => value case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => Some(value)
case _: DeletedDurableState[_] => None
} }
//#get-durable-state-store-query-example //#get-durable-state-store-query-example
} }

View file

@ -46,4 +46,3 @@ Java
: @@snip [DurableStateStoreQueryUsageCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlyTest.java) { #get-durable-state-store-query-example } : @@snip [DurableStateStoreQueryUsageCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlyTest.java) { #get-durable-state-store-query-example }
The @apidoc[DurableStateChange] elements can be `UpdatedDurableState` or `DeletedDurableState`. The @apidoc[DurableStateChange] elements can be `UpdatedDurableState` or `DeletedDurableState`.
`DeletedDurableState` is not implemented yet.

View file

@ -152,6 +152,7 @@ and can be one of:
* `persist` will persist the latest state. If it's a new persistence id, the record will be inserted. In case of an existing * `persist` will persist the latest state. If it's a new persistence id, the record will be inserted. In case of an existing
persistence id, the record will be updated only if the revision number of the incoming record is 1 more than the already persistence id, the record will be updated only if the revision number of the incoming record is 1 more than the already
existing record. Otherwise `persist` will fail. existing record. Otherwise `persist` will fail.
* `delete` will delete the state by setting it to the empty state and the revision number will be incremented by 1.
* `none` no state to be persisted, for example a read-only command * `none` no state to be persisted, for example a read-only command
* `unhandled` the command is unhandled (not supported) in current state * `unhandled` the command is unhandled (not supported) in current state
* `stop` stop this actor * `stop` stop this actor

View file

@ -9,8 +9,7 @@ import akka.annotation.DoNotInherit
/** /**
* The `DurableStateStoreQuery` stream elements for `DurableStateStoreQuery`. * The `DurableStateStoreQuery` stream elements for `DurableStateStoreQuery`.
* *
* The implementation can be a [[UpdatedDurableState]] or a `DeletedDurableState`. * The implementation can be a [[UpdatedDurableState]] or a [[DeletedDurableState]].
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446
* *
* Not for user extension * Not for user extension
* *
@ -53,3 +52,25 @@ final class UpdatedDurableState[A](
override val offset: Offset, override val offset: Offset,
val timestamp: Long) val timestamp: Long)
extends DurableStateChange[A] extends DurableStateChange[A]
object DeletedDurableState {
def unapply[A](arg: DeletedDurableState[A]): Option[(String, Long, Offset, Long)] =
Some((arg.persistenceId, arg.revision, arg.offset, arg.timestamp))
}
/**
*
* @param persistenceId The persistence id of the origin entity.
* @param revision The revision number from the origin entity.
* @param offset The offset that can be used in next `changes` or `currentChanges` query.
* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
* (same as `System.currentTimeMillis`).
* @tparam A the type of the value
*/
final class DeletedDurableState[A](
val persistenceId: String,
val revision: Long,
override val offset: Offset,
val timestamp: Long)
extends DurableStateChange[A]

View file

@ -26,8 +26,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to * This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results. * objects made since materialization are not guaranteed to be included in the results.
* *
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. * [[akka.persistence.query.DeletedDurableState]].
* *
* @param tag The tag to get changes for. * @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get * @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
@ -48,8 +48,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* in quick succession are likely to be skipped, with only the last update resulting in a change from this * in quick succession are likely to be skipped, with only the last update resulting in a change from this
* source. * source.
* *
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. * [[akka.persistence.query.DeletedDurableState]].
* *
* @param tag The tag to get changes for. * @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get * @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get

View file

@ -26,8 +26,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to * This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results. * objects made since materialization are not guaranteed to be included in the results.
* *
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. * [[akka.persistence.query.DeletedDurableState]].
* *
* @param tag The tag to get changes for. * @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get * @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
@ -48,8 +48,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* in quick succession are likely to be skipped, with only the last update resulting in a change from this * in quick succession are likely to be skipped, with only the last update resulting in a change from this
* source. * source.
* *
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. * [[akka.persistence.query.DeletedDurableState]].
* *
* @param tag The tag to get changes for. * @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get * @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get

View file

@ -34,8 +34,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to * This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results. * objects made since materialization are not guaranteed to be included in the results.
* *
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. * [[akka.persistence.query.DeletedDurableState]].
*/ */
def currentChangesBySlices( def currentChangesBySlices(
entityType: String, entityType: String,
@ -56,8 +56,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
* change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick * change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick
* succession are likely to be skipped, with only the last update resulting in a change from this source. * succession are likely to be skipped, with only the last update resulting in a change from this source.
* *
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. * [[akka.persistence.query.DeletedDurableState]].
*/ */
def changesBySlices( def changesBySlices(
entityType: String, entityType: String,

View file

@ -35,8 +35,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to * This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results. * objects made since materialization are not guaranteed to be included in the results.
* *
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. * [[akka.persistence.query.DeletedDurableState]].
*/ */
def currentChangesBySlices( def currentChangesBySlices(
entityType: String, entityType: String,
@ -57,8 +57,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
* change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick * change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick
* succession are likely to be skipped, with only the last update resulting in a change from this source. * succession are likely to be skipped, with only the last update resulting in a change from this source.
* *
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. * [[akka.persistence.query.DeletedDurableState]].
*/ */
def changesBySlices( def changesBySlices(
entityType: String, entityType: String,

View file

@ -5,11 +5,9 @@
package akka.persistence.testkit.state.javadsl package akka.persistence.testkit.state.javadsl
import java.util.Optional import java.util.Optional
import java.util.concurrent.CompletionStage import java.util.concurrent.{ CompletableFuture, CompletionStage }
import scala.compat.java8.FutureConverters._ import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._ import scala.compat.java8.OptionConverters._
import akka.japi.Pair import akka.japi.Pair
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
import akka.persistence.query.DurableStateChange import akka.persistence.query.DurableStateChange
@ -37,8 +35,10 @@ class PersistenceTestKitDurableStateStore[A](stateStore: SStore[A])
def upsertObject(persistenceId: String, seqNr: Long, value: A, tag: String): CompletionStage[Done] = def upsertObject(persistenceId: String, seqNr: Long, value: A, tag: String): CompletionStage[Done] =
stateStore.upsertObject(persistenceId, seqNr, value, tag).toJava stateStore.upsertObject(persistenceId, seqNr, value, tag).toJava
def deleteObject(persistenceId: String): CompletionStage[Done] = def deleteObject(persistenceId: String): CompletionStage[Done] = CompletableFuture.completedFuture(Done)
stateStore.deleteObject(persistenceId).toJava
def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] =
stateStore.deleteObject(persistenceId, revision).toJava
def changes(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = { def changes(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = {
stateStore.changes(tag, offset).asJava stateStore.changes(tag, offset).asJava

View file

@ -11,12 +11,15 @@ import scala.concurrent.Future
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.persistence.Persistence import akka.persistence.Persistence
import akka.persistence.query.DurableStateChange import akka.persistence.query.{
DeletedDurableState,
DurableStateChange,
NoOffset,
Offset,
Sequence,
UpdatedDurableState
}
import akka.persistence.query.scaladsl.{ DurableStateStorePagedPersistenceIdsQuery, DurableStateStoreQuery } import akka.persistence.query.scaladsl.{ DurableStateStorePagedPersistenceIdsQuery, DurableStateStoreQuery }
import akka.persistence.query.UpdatedDurableState
import akka.persistence.query.Offset
import akka.persistence.query.NoOffset
import akka.persistence.query.Sequence
import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery
import akka.persistence.state.scaladsl.{ DurableStateUpdateStore, GetObjectResult } import akka.persistence.state.scaladsl.{ DurableStateUpdateStore, GetObjectResult }
import akka.persistence.typed.PersistenceId import akka.persistence.typed.PersistenceId
@ -27,8 +30,6 @@ import akka.stream.typed.scaladsl.ActorSource
import akka.stream.OverflowStrategy import akka.stream.OverflowStrategy
import scala.collection.immutable import scala.collection.immutable
import akka.persistence.testkit.internal.CurrentTime
object PersistenceTestKitDurableStateStore { object PersistenceTestKitDurableStateStore {
val Identifier = "akka.persistence.testkit.state" val Identifier = "akka.persistence.testkit.state"
} }
@ -54,22 +55,29 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem)
override def getObject(persistenceId: String): Future[GetObjectResult[A]] = this.synchronized { override def getObject(persistenceId: String): Future[GetObjectResult[A]] = this.synchronized {
Future.successful(store.get(persistenceId) match { Future.successful(store.get(persistenceId) match {
case Some(record) => GetObjectResult(Some(record.value), record.revision) case Some(Record(_, _, revision, Some(value), _, _)) => GetObjectResult(Some(value), revision)
case None => GetObjectResult(None, 0) case Some(Record(_, _, revision, None, _, _)) => GetObjectResult(None, revision)
case None => GetObjectResult(None, 0)
}) })
} }
override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] =
this.synchronized { this.synchronized {
val globalOffset = lastGlobalOffset.incrementAndGet() val globalOffset = lastGlobalOffset.incrementAndGet()
val record = Record(globalOffset, persistenceId, revision, value, tag) val record = Record(globalOffset, persistenceId, revision, Some(value), tag)
store = store + (persistenceId -> record) store = store + (persistenceId -> record)
publisher ! record publisher ! record
Future.successful(Done) Future.successful(Done)
} }
override def deleteObject(persistenceId: String): Future[Done] = this.synchronized { override def deleteObject(persistenceId: String): Future[Done] = Future.successful(Done)
store = store - persistenceId
override def deleteObject(persistenceId: String, revision: Long): Future[Done] = this.synchronized {
store = store.get(persistenceId) match {
case Some(record) => store + (persistenceId -> record.copy(value = None, revision = revision))
case None => store
}
Future.successful(Done) Future.successful(Done)
} }
@ -191,9 +199,15 @@ private final case class Record[A](
globalOffset: Long, globalOffset: Long,
persistenceId: String, persistenceId: String,
revision: Long, revision: Long,
value: A, value: Option[A],
tag: String, tag: String,
timestamp: Long = CurrentTime.now()) { timestamp: Long = System.currentTimeMillis) {
def toDurableStateChange: DurableStateChange[A] = def toDurableStateChange: DurableStateChange[A] = {
new UpdatedDurableState(persistenceId, revision, value, Sequence(globalOffset), timestamp) value match {
case Some(v) =>
new UpdatedDurableState(persistenceId, revision, v, Sequence(globalOffset), timestamp)
case None =>
new DeletedDurableState(persistenceId, revision, Sequence(globalOffset), timestamp)
}
}
} }

View file

@ -31,6 +31,7 @@ object DurableStateBehaviorReplySpec {
final case class IncrementReplyLater(replyTo: ActorRef[Done]) extends Command[Done] final case class IncrementReplyLater(replyTo: ActorRef[Done]) extends Command[Done]
final case class ReplyNow(replyTo: ActorRef[Done]) extends Command[Done] final case class ReplyNow(replyTo: ActorRef[Done]) extends Command[Done]
final case class GetValue(replyTo: ActorRef[State]) extends Command[State] final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
final case class DeleteWithConfirmation(replyTo: ActorRef[Done]) extends Command[Done]
case object Increment extends Command[Nothing] case object Increment extends Command[Nothing]
case class IncrementBy(by: Int) extends Command[Nothing] case class IncrementBy(by: Int) extends Command[Nothing]
@ -61,6 +62,9 @@ object DurableStateBehaviorReplySpec {
case GetValue(replyTo) => case GetValue(replyTo) =>
Effect.reply(replyTo)(state) Effect.reply(replyTo)(state)
case DeleteWithConfirmation(replyTo) =>
Effect.delete[State]().thenReply(replyTo)(_ => Done)
case _ => ??? case _ => ???
}) })
@ -108,5 +112,20 @@ class DurableStateBehaviorReplySpec
c ! GetValue(queryProbe.ref) c ! GetValue(queryProbe.ref)
queryProbe.expectMessage(State(1)) queryProbe.expectMessage(State(1))
} }
"delete state thenReply" in {
val c = spawn(counter(nextPid()))
val updateProbe = TestProbe[Done]()
c ! IncrementWithConfirmation(updateProbe.ref)
updateProbe.expectMessage(Done)
val deleteProbe = TestProbe[Done]()
c ! DeleteWithConfirmation(deleteProbe.ref)
deleteProbe.expectMessage(Done)
val queryProbe = TestProbe[State]()
c ! GetValue(queryProbe.ref)
queryProbe.expectMessage(State(0))
}
} }
} }

View file

@ -176,6 +176,8 @@ private[akka] final case class DurableStateBehaviorImpl[Command, State](
final case class GetFailure(cause: Throwable) extends InternalProtocol final case class GetFailure(cause: Throwable) extends InternalProtocol
case object UpsertSuccess extends InternalProtocol case object UpsertSuccess extends InternalProtocol
final case class UpsertFailure(cause: Throwable) extends InternalProtocol final case class UpsertFailure(cause: Throwable) extends InternalProtocol
case object DeleteSuccess extends InternalProtocol
final case class DeleteFailure(cause: Throwable) extends InternalProtocol
case object RecoveryTimeout extends InternalProtocol case object RecoveryTimeout extends InternalProtocol
final case class IncomingCommand[C](c: C) extends InternalProtocol final case class IncomingCommand[C](c: C) extends InternalProtocol

View file

@ -56,10 +56,30 @@ private[akka] trait DurableStateStoreInteractions[C, S] {
newRunningState newRunningState
} }
protected def internalDelete(
ctx: ActorContext[InternalProtocol],
cmd: Any,
state: Running.RunningState[S]): Running.RunningState[S] = {
val newRunningState = state.nextRevision().copy(state = setup.emptyState)
val persistenceId = setup.persistenceId.id
onDeleteInitiated(ctx, cmd)
ctx.pipeToSelf[Done](setup.durableStateStore.deleteObject(persistenceId, newRunningState.revision)) {
case Success(_) => InternalProtocol.DeleteSuccess
case Failure(cause) => InternalProtocol.DeleteFailure(cause)
}
newRunningState
}
// FIXME These hook methods are for Telemetry. What more parameters are needed? persistenceId? // FIXME These hook methods are for Telemetry. What more parameters are needed? persistenceId?
@InternalStableApi @InternalStableApi
private[akka] def onWriteInitiated(@unused ctx: ActorContext[_], @unused cmd: Any): Unit = () private[akka] def onWriteInitiated(@unused ctx: ActorContext[_], @unused cmd: Any): Unit = ()
private[akka] def onDeleteInitiated(@unused ctx: ActorContext[_], @unused cmd: Any): Unit = ()
protected def requestRecoveryPermit(): Unit = { protected def requestRecoveryPermit(): Unit = {
setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, setup.selfClassic) setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, setup.selfClassic)
} }

View file

@ -71,6 +71,10 @@ private[akka] final case class Persist[State](newState: State) extends EffectImp
override def toString: String = s"Persist(${newState.getClass.getName})" override def toString: String = s"Persist(${newState.getClass.getName})"
} }
/** INTERNAL API */
@InternalApi
private[akka] case class Delete[State]() extends EffectImpl[State]
/** INTERNAL API */ /** INTERNAL API */
@InternalApi @InternalApi
private[akka] case object Unhandled extends EffectImpl[Nothing] private[akka] case object Unhandled extends EffectImpl[Nothing]

View file

@ -88,6 +88,8 @@ private[akka] class Recovering[C, S](
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
case UpsertSuccess => Behaviors.unhandled case UpsertSuccess => Behaviors.unhandled
case _: UpsertFailure => Behaviors.unhandled case _: UpsertFailure => Behaviors.unhandled
case DeleteSuccess => Behaviors.unhandled
case _: DeleteFailure => Behaviors.unhandled
} }
} }

View file

@ -141,6 +141,10 @@ private[akka] object Running {
case _: PersistNothing.type => case _: PersistNothing.type =>
(applySideEffects(sideEffects, state), true) (applySideEffects(sideEffects, state), true)
case _: Delete[_] =>
val nextState = internalDelete(setup.context, msg, state)
(applySideEffects(sideEffects, nextState), true)
case _: Unhandled.type => case _: Unhandled.type =>
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
setup.context.system.toClassic.eventStream setup.context.system.toClassic.eventStream
@ -194,6 +198,8 @@ private[akka] object Running {
case RecoveryPermitGranted => Behaviors.unhandled case RecoveryPermitGranted => Behaviors.unhandled
case _: GetSuccess[_] => Behaviors.unhandled case _: GetSuccess[_] => Behaviors.unhandled
case _: GetFailure => Behaviors.unhandled case _: GetFailure => Behaviors.unhandled
case DeleteSuccess => Behaviors.unhandled
case DeleteFailure(_) => Behaviors.unhandled
} }
} }

View file

@ -25,7 +25,12 @@ object Effect {
*/ */
def persist[State](state: State): EffectBuilder[State] = Persist(state) def persist[State](state: State): EffectBuilder[State] = Persist(state)
// FIXME add delete effect /**
* Delete the persisted state.
*
* Side effects can be chained with `thenRun`
*/
def delete[State](): EffectBuilder[State] = Delete()
/** /**
* Do not persist anything * Do not persist anything

View file

@ -30,6 +30,7 @@ object DurableStatePersistentBehaviorCompileOnly {
final case object Increment extends Command[Nothing] final case object Increment extends Command[Nothing]
final case class IncrementBy(value: Int) extends Command[Nothing] final case class IncrementBy(value: Int) extends Command[Nothing]
final case class GetValue(replyTo: ActorRef[State]) extends Command[State] final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
final case object Delete extends Command[Nothing]
//#command //#command
//#state //#state
@ -44,6 +45,7 @@ object DurableStatePersistentBehaviorCompileOnly {
case Increment => Effect.persist(state.copy(value = state.value + 1)) case Increment => Effect.persist(state.copy(value = state.value + 1))
case IncrementBy(by) => Effect.persist(state.copy(value = state.value + by)) case IncrementBy(by) => Effect.persist(state.copy(value = state.value + by))
case GetValue(replyTo) => Effect.reply(replyTo)(state) case GetValue(replyTo) => Effect.reply(replyTo)(state)
case Delete => Effect.delete[State]()
} }
//#command-handler //#command-handler

View file

@ -0,0 +1,3 @@
# #30446 Addition of deleteObject overloaded method
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.state.javadsl.DurableStateUpdateStore.deleteObject")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.state.scaladsl.DurableStateUpdateStore.deleteObject")

View file

@ -20,5 +20,8 @@ trait DurableStateUpdateStore[A] extends DurableStateStore[A] {
*/ */
def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done]
@deprecated(message = "Use the deleteObject overload with revision instead.", since = "2.6.20")
def deleteObject(persistenceId: String): CompletionStage[Done] def deleteObject(persistenceId: String): CompletionStage[Done]
def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done]
} }

View file

@ -20,6 +20,8 @@ trait DurableStateUpdateStore[A] extends DurableStateStore[A] {
*/ */
def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done]
@deprecated(message = "Use the deleteObject overload with revision instead.", since = "2.6.20")
def deleteObject(persistenceId: String): Future[Done] def deleteObject(persistenceId: String): Future[Done]
def deleteObject(persistenceId: String, revision: Long): Future[Done]
} }