new durable state persistence exceptions (#1271)

* new persistence exceptions

typos

scalafmt

add annotation

refactor due to review comments

Update DurableStateExceptionSpec.scala

Delete PersistenceException.scala

refactor

Update DurableStateStoreException.scala

refactor again

* refactor again

* Update DurableStateStoreException.scala

* more generic exception

* add javadoc
This commit is contained in:
PJ Fanning 2024-04-22 21:17:10 +02:00 committed by GitHub
parent ad535376f2
commit 64e07dcf1b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 159 additions and 4 deletions

View file

@ -30,6 +30,7 @@ import pekko.actor.typed.scaladsl.Behaviors
import pekko.annotation._
import pekko.persistence.RecoveryPermitter
import pekko.persistence.typed.state.scaladsl._
import pekko.persistence.state.exception.DurableStateException
import pekko.persistence.state.scaladsl.GetObjectResult
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.SnapshotAdapter
@ -153,7 +154,7 @@ private[pekko] final case class DurableStateBehaviorImpl[Command, State](
}
}
.onFailure[DurableStateStoreException](supervisionStrategy)
.onFailure[DurableStateException](supervisionStrategy)
}
@InternalStableApi

View file

@ -15,6 +15,7 @@ package org.apache.pekko.persistence.typed.state.internal
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.persistence.state.exception.DurableStateException
import pekko.persistence.typed.PersistenceId
/**
@ -24,7 +25,7 @@ import pekko.persistence.typed.PersistenceId
*/
@InternalApi
final private[pekko] class DurableStateStoreException(msg: String, cause: Throwable)
extends RuntimeException(msg, cause) {
extends DurableStateException(msg, cause) {
def this(persistenceId: PersistenceId, sequenceNr: Long, cause: Throwable) =
this(s"Failed to persist state with sequence number [$sequenceNr] for persistenceId [${persistenceId.id}]", cause)
}

View file

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pekko.persistence.state.exception
import scala.util.control.NoStackTrace
/**
* Exception thrown when Durable State cannot be updated or deleted.
*
* @param msg the exception message
* @param cause the exception cause
* @since 1.1.0
*/
abstract class DurableStateException(msg: String, cause: Throwable)
extends RuntimeException(msg, cause) {
def this(msg: String) = this(msg, null)
}
/**
* Exception thrown when Durable State revision cannot be deleted.
* The revision could be out of date.
*
* @param msg the exception message
* @since 1.1.0
*/
final class DeleteRevisionException(msg: String)
extends DurableStateException(msg) with NoStackTrace

View file

@ -26,12 +26,39 @@ import pekko.Done
trait DurableStateUpdateStore[A] extends DurableStateStore[A] {
/**
* @param seqNr sequence number for optimistic locking. starts at 1.
* Upsert the object with the given `persistenceId` and `revision`.
*
* @param persistenceId the persistenceId of the object to upsert
* @param revision the revision of the object to upsert
* @param value the value to upsert
* @param tag a tag to associate with the object
* @return a CompletionStage that completes when the object has been upserted
*/
def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done]
/**
* Delete the object with the given `persistenceId`. This deprecated
* function ignores whether the object is deleted or not.
*
* @param persistenceId the persistenceId of the object to delete
* @param revision the revision of the object to delete
* @return a CompletionStage that completes when the object has been deleted
*/
@deprecated(message = "Use the deleteObject overload with revision instead.", since = "Akka 2.6.20")
def deleteObject(persistenceId: String): CompletionStage[Done]
/**
* Delete the object with the given `persistenceId` and `revision`.
*
* <p>
* Since Pekko v1.1, if the revision does not match the current revision
* of the object, the delete operation will fail. The returned CompletionStage
* will complete with a failed result wrapping the exception.
* </p>
*
* @param persistenceId the persistenceId of the object to delete
* @param revision the revision of the object to delete
* @return a CompletionStage that completes when the object has been deleted
*/
def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done]
}

View file

@ -26,12 +26,39 @@ import pekko.Done
trait DurableStateUpdateStore[A] extends DurableStateStore[A] {
/**
* @param seqNr sequence number for optimistic locking. starts at 1.
* Upsert the object with the given `persistenceId` and `revision`.
*
* @param persistenceId the persistenceId of the object to upsert
* @param revision the revision of the object to upsert
* @param value the value to upsert
* @param tag a tag to associate with the object
* @return a Future that completes when the object has been upserted
*/
def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done]
/**
* Delete the object with the given `persistenceId`. This deprecated
* function ignores whether the object is deleted or not.
*
* @param persistenceId the persistenceId of the object to delete
* @param revision the revision of the object to delete
* @return a Future that completes when the object has been deleted
*/
@deprecated(message = "Use the deleteObject overload with revision instead.", since = "Akka 2.6.20")
def deleteObject(persistenceId: String): Future[Done]
/**
* Delete the object with the given `persistenceId` and `revision`.
*
* <p>
* Since Pekko v1.1, if the revision does not match the current revision
* of the object, the delete operation will fail. The returned Future
* will complete with a failed result wrapping the exception.
* </p>
*
* @param persistenceId the persistenceId of the object to delete
* @param revision the revision of the object to delete
* @return a Future that completes when the object has been deleted
*/
def deleteObject(persistenceId: String, revision: Long): Future[Done]
}

View file

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pekko.persistence.state.exception
import java.lang.invoke.{ MethodHandles, MethodType }
import scala.util.Try
import scala.util.control.NoStackTrace
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.matchers.should.Matchers
/**
* Tests for [[DurableStateException]]s.
* <p>
* To avoid making Pekko persistence implementations dependent on
* Pekko v1.1, these exceptions will be created using MethodHandles.
* </p>
*/
class DurableStateExceptionsSpec extends AnyWordSpecLike
with Matchers {
private val methodHandleLookup = MethodHandles.publicLookup()
"DurableStateException support" must {
"allow creating DeleteRevisionException using MethodHandle" in {
val exceptionClassOpt: Option[Class[_]] = Try(Class.forName(
"org.apache.pekko.persistence.state.exception.DeleteRevisionException")).toOption
exceptionClassOpt should not be empty
val constructorOpt = exceptionClassOpt.map { clz =>
val mt = MethodType.methodType(classOf[Unit], classOf[String])
methodHandleLookup.findConstructor(clz, mt)
}
constructorOpt should not be empty
val constructor = constructorOpt.get
val ex = constructor.invoke("delete failed").asInstanceOf[Exception]
ex shouldBe an[DeleteRevisionException]
ex shouldBe an[NoStackTrace]
ex.getMessage shouldEqual "delete failed"
}
}
}