diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 96eb781013..bf953bdc99 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -291,6 +291,9 @@ next message. If there is a problem with recovering the state of the actor from the journal when the actor is started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. +If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) +and the actor continues with next message. + Atomic writes ------------- @@ -323,6 +326,9 @@ Message deletion To delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors may call the ``deleteMessages`` method. +If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) +and the actor continues with next message. + .. _persistent-views-java-lambda: Views diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index a727c2b43b..f3208affae 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -326,6 +326,12 @@ Message deletion To delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors may call the ``deleteMessages`` method. +If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) +and the actor continues with next message. + +If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) +and the actor continues with next message. + .. _persistent-views-java: Persistent Views diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index ab91c55a1a..64f00118c6 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -280,6 +280,9 @@ next message. If there is a problem with recovering the state of the actor from the journal when the actor is started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped. +If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) +and the actor continues with next message. + Atomic writes ------------- @@ -315,6 +318,9 @@ Message deletion To delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors may call the ``deleteMessages`` method. +If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default) +and the actor continues with next message. + .. _persistent-views: Persistent Views diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index ba287ccece..72b0cd3eab 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -1,5 +1,6 @@ package akka.persistence.journal +import scala.concurrent.duration._ import scala.collection.immutable.Seq import akka.actor._ @@ -137,7 +138,8 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { receiverProbe.expectMsg(ReplayMessagesSuccess) } "not replay permanently deleted messages (range deletion)" in { - val cmd = DeleteMessagesTo(pid, 3) + val receiverProbe2 = TestProbe() + val cmd = DeleteMessagesTo(pid, 3, receiverProbe2.ref) val sub = TestProbe() subscribe[DeleteMessagesTo](sub.ref) @@ -146,6 +148,8 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) List(4, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } + + receiverProbe2.expectNoMsg(200.millis) } "return a highest stored sequence number > 0 if the persistent actor has already written messages and the message log is non-empty" in { diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 91d873ffda..c794f6d4e6 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -147,6 +147,18 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas event.getClass.getName, seqNr, persistenceId, cause.getMessage) } + /** + * Called when ``deleteMessages`` failed. By default this method logs the problem + * as a warning, and the actor continues. + * + * @param cause failure cause + * @param toSequenceNr the sequence number parameter of the ``deleteMessages`` call + */ + protected def onDeleteMessagesFailure(cause: Throwable, toSequenceNr: Long): Unit = { + log.warning("Failed to deleteMessages toSequenceNr [{}] for persistenceId [{}] due to [{}].", + toSequenceNr, persistenceId, cause.getMessage) + } + /** * User-overridable callback. Called when a persistent actor is started or restarted. * Default implementation sends a `Recover()` to `self`. Note that if you override @@ -378,10 +390,12 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas /** * Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`. * + * If the delete fails [[#onDeleteMessagesFailure]] will be called. + * * @param toSequenceNr upper sequence number bound of persistent messages to be deleted. */ def deleteMessages(toSequenceNr: Long): Unit = - deleteMessages(toSequenceNr) + journal ! DeleteMessagesTo(persistenceId, toSequenceNr, self) /** * Returns `true` if this persistent actor is currently recovering. @@ -566,6 +580,10 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas case WriteMessagesFailed(_) ⇒ () // it will be stopped by the first WriteMessageFailure message + + case DeleteMessagesFailure(e, toSequenceNr) ⇒ + onDeleteMessagesFailure(e, toSequenceNr) + } def onWriteMessageComplete(err: Boolean): Unit = diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 29e5cf4f3d..ce512c2e67 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -25,14 +25,14 @@ private[persistence] object JournalProtocol { /** * Reply message to a failed [[DeleteMessagesTo]] request. */ - final case class DeleteMessagesFailure(cause: Throwable) + final case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long) extends Response /** * Request to delete all persistent messages with sequence numbers up to `toSequenceNr` * (inclusive). */ - final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long) + final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, persistentActor: ActorRef) extends Request /** diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index e2826ea6c7..20a861a980 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -117,10 +117,10 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { case e ⇒ ReadHighestSequenceNrFailure(e) } pipeTo persistentActor - case d @ DeleteMessagesTo(persistenceId, toSequenceNr) ⇒ + case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) ⇒ asyncDeleteMessagesTo(persistenceId, toSequenceNr) onComplete { case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) - case Failure(e) ⇒ + case Failure(e) ⇒ persistentActor ! DeleteMessagesFailure(e, toSequenceNr) } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 945591afa7..8607f69d72 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -20,6 +20,7 @@ object PersistentActorSpec { final case class Cmd(data: Any) final case class Evt(data: Any) final case class LatchCmd(latch: TestLatch, data: Any) extends NoSerializationVerificationNeeded + final case class Delete(toSequenceNr: Long) abstract class ExamplePersistentActor(name: String) extends NamedPersistentActor(name) with PersistentActor { var events: List[Any] = Nil @@ -29,8 +30,9 @@ object PersistentActorSpec { } val commonBehavior: Receive = { - case "boom" ⇒ throw new TestException("boom") - case GetState ⇒ sender() ! events.reverse + case "boom" ⇒ throw new TestException("boom") + case Delete(toSequenceNr) ⇒ deleteMessages(toSequenceNr) + case GetState ⇒ sender() ! events.reverse } def receiveRecover = updateState @@ -892,6 +894,17 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectNoMsg(100.millis) } + "be able to delete events" in { + val persistentActor = namedPersistentActor[Behavior1PersistentActor] + persistentActor ! Cmd("b") + persistentActor ! GetState + expectMsg(List("a-1", "a-2", "b-1", "b-2")) + persistentActor ! Delete(2L) // delete "a-1" and "a-2" + persistentActor ! "boom" // restart, recover + persistentActor ! GetState + expectMsg(List("b-1", "b-2")) + } + } }