diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala index 93cf062764..b87589e68f 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala @@ -30,6 +30,7 @@ import pekko.actor.typed.scaladsl.Behaviors import pekko.annotation._ import pekko.persistence.RecoveryPermitter import pekko.persistence.typed.state.scaladsl._ +import pekko.persistence.state.exception.DurableStateException import pekko.persistence.state.scaladsl.GetObjectResult import pekko.persistence.typed.PersistenceId import pekko.persistence.typed.SnapshotAdapter @@ -153,7 +154,7 @@ private[pekko] final case class DurableStateBehaviorImpl[Command, State]( } } - .onFailure[DurableStateStoreException](supervisionStrategy) + .onFailure[DurableStateException](supervisionStrategy) } @InternalStableApi diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreException.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreException.scala index 3a75d03a4a..5f0dcfbcf1 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreException.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreException.scala @@ -15,6 +15,7 @@ package org.apache.pekko.persistence.typed.state.internal import org.apache.pekko import pekko.annotation.InternalApi +import pekko.persistence.state.exception.DurableStateException import pekko.persistence.typed.PersistenceId /** @@ -24,7 +25,7 @@ import pekko.persistence.typed.PersistenceId */ @InternalApi final private[pekko] class DurableStateStoreException(msg: String, cause: Throwable) - extends RuntimeException(msg, cause) { + extends DurableStateException(msg, cause) { def this(persistenceId: PersistenceId, sequenceNr: Long, cause: Throwable) = this(s"Failed to persist state with sequence number [$sequenceNr] for persistenceId [${persistenceId.id}]", cause) } diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/state/exception/DurableStateException.scala b/persistence/src/main/scala/org/apache/pekko/persistence/state/exception/DurableStateException.scala new file mode 100644 index 0000000000..d1908bc742 --- /dev/null +++ b/persistence/src/main/scala/org/apache/pekko/persistence/state/exception/DurableStateException.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.state.exception + +import scala.util.control.NoStackTrace + +/** + * Exception thrown when Durable State cannot be updated or deleted. + * + * @param msg the exception message + * @param cause the exception cause + * @since 1.1.0 + */ +abstract class DurableStateException(msg: String, cause: Throwable) + extends RuntimeException(msg, cause) { + def this(msg: String) = this(msg, null) +} + +/** + * Exception thrown when Durable State revision cannot be deleted. + * The revision could be out of date. + * + * @param msg the exception message + * @since 1.1.0 + */ +final class DeleteRevisionException(msg: String) + extends DurableStateException(msg) with NoStackTrace diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/state/javadsl/DurableStateUpdateStore.scala b/persistence/src/main/scala/org/apache/pekko/persistence/state/javadsl/DurableStateUpdateStore.scala index ca2a5c69c1..82b5ba8ef8 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/state/javadsl/DurableStateUpdateStore.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/state/javadsl/DurableStateUpdateStore.scala @@ -26,12 +26,39 @@ import pekko.Done trait DurableStateUpdateStore[A] extends DurableStateStore[A] { /** - * @param seqNr sequence number for optimistic locking. starts at 1. + * Upsert the object with the given `persistenceId` and `revision`. + * + * @param persistenceId the persistenceId of the object to upsert + * @param revision the revision of the object to upsert + * @param value the value to upsert + * @param tag a tag to associate with the object + * @return a CompletionStage that completes when the object has been upserted */ def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] + /** + * Delete the object with the given `persistenceId`. This deprecated + * function ignores whether the object is deleted or not. + * + * @param persistenceId the persistenceId of the object to delete + * @param revision the revision of the object to delete + * @return a CompletionStage that completes when the object has been deleted + */ @deprecated(message = "Use the deleteObject overload with revision instead.", since = "Akka 2.6.20") def deleteObject(persistenceId: String): CompletionStage[Done] + /** + * Delete the object with the given `persistenceId` and `revision`. + * + *

+ * Since Pekko v1.1, if the revision does not match the current revision + * of the object, the delete operation will fail. The returned CompletionStage + * will complete with a failed result wrapping the exception. + *

+ * + * @param persistenceId the persistenceId of the object to delete + * @param revision the revision of the object to delete + * @return a CompletionStage that completes when the object has been deleted + */ def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] } diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/state/scaladsl/DurableStateUpdateStore.scala b/persistence/src/main/scala/org/apache/pekko/persistence/state/scaladsl/DurableStateUpdateStore.scala index 0d44597421..be38e4d885 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/state/scaladsl/DurableStateUpdateStore.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/state/scaladsl/DurableStateUpdateStore.scala @@ -26,12 +26,39 @@ import pekko.Done trait DurableStateUpdateStore[A] extends DurableStateStore[A] { /** - * @param seqNr sequence number for optimistic locking. starts at 1. + * Upsert the object with the given `persistenceId` and `revision`. + * + * @param persistenceId the persistenceId of the object to upsert + * @param revision the revision of the object to upsert + * @param value the value to upsert + * @param tag a tag to associate with the object + * @return a Future that completes when the object has been upserted */ def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] + /** + * Delete the object with the given `persistenceId`. This deprecated + * function ignores whether the object is deleted or not. + * + * @param persistenceId the persistenceId of the object to delete + * @param revision the revision of the object to delete + * @return a Future that completes when the object has been deleted + */ @deprecated(message = "Use the deleteObject overload with revision instead.", since = "Akka 2.6.20") def deleteObject(persistenceId: String): Future[Done] + /** + * Delete the object with the given `persistenceId` and `revision`. + * + *

+ * Since Pekko v1.1, if the revision does not match the current revision + * of the object, the delete operation will fail. The returned Future + * will complete with a failed result wrapping the exception. + *

+ * + * @param persistenceId the persistenceId of the object to delete + * @param revision the revision of the object to delete + * @return a Future that completes when the object has been deleted + */ def deleteObject(persistenceId: String, revision: Long): Future[Done] } diff --git a/persistence/src/test/scala/org/apache/pekko/persistence/state/exception/DurableStateExceptionsSpec.scala b/persistence/src/test/scala/org/apache/pekko/persistence/state/exception/DurableStateExceptionsSpec.scala new file mode 100644 index 0000000000..0d6dcd7a54 --- /dev/null +++ b/persistence/src/test/scala/org/apache/pekko/persistence/state/exception/DurableStateExceptionsSpec.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.state.exception + +import java.lang.invoke.{ MethodHandles, MethodType } + +import scala.util.Try +import scala.util.control.NoStackTrace + +import org.scalatest.wordspec.AnyWordSpecLike +import org.scalatest.matchers.should.Matchers + +/** + * Tests for [[DurableStateException]]s. + *

+ * To avoid making Pekko persistence implementations dependent on + * Pekko v1.1, these exceptions will be created using MethodHandles. + *

+ */ +class DurableStateExceptionsSpec extends AnyWordSpecLike + with Matchers { + + private val methodHandleLookup = MethodHandles.publicLookup() + + "DurableStateException support" must { + "allow creating DeleteRevisionException using MethodHandle" in { + val exceptionClassOpt: Option[Class[_]] = Try(Class.forName( + "org.apache.pekko.persistence.state.exception.DeleteRevisionException")).toOption + exceptionClassOpt should not be empty + val constructorOpt = exceptionClassOpt.map { clz => + val mt = MethodType.methodType(classOf[Unit], classOf[String]) + methodHandleLookup.findConstructor(clz, mt) + } + constructorOpt should not be empty + val constructor = constructorOpt.get + val ex = constructor.invoke("delete failed").asInstanceOf[Exception] + ex shouldBe an[DeleteRevisionException] + ex shouldBe an[NoStackTrace] + ex.getMessage shouldEqual "delete failed" + } + } +}