diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 739a6dd9df..9b1ac8d859 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -427,10 +427,20 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas * If the delete is successful a [[DeleteMessagesSuccess]] will be sent to the actor. * If the delete fails a [[DeleteMessagesFailure]] will be sent to the actor. * + * The given `toSequenceNr` must be less than or equal to [[Eventsourced#lastSequenceNr]], otherwise + * [[DeleteMessagesFailure]] is sent to the actor without performing the delete. All persistent + * messages may be deleted without specifying the actual sequence number by using `Long.MaxValue` + * as the `toSequenceNr`. + * * @param toSequenceNr upper sequence number bound of persistent messages to be deleted. */ - def deleteMessages(toSequenceNr: Long): Unit = - journal ! DeleteMessagesTo(persistenceId, toSequenceNr, self) + def deleteMessages(toSequenceNr: Long): Unit = { + if (toSequenceNr == Long.MaxValue || toSequenceNr <= lastSequenceNr) + journal ! DeleteMessagesTo(persistenceId, toSequenceNr, self) + else + self ! DeleteMessagesFailure(new RuntimeException( + s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"), toSequenceNr) + } /** * Returns `true` if this persistent actor is currently recovering. diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 1a7876fdf3..c3fee2a1c9 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -24,7 +24,7 @@ private[persistence] object JournalProtocol { /** * Request to delete all persistent messages with sequence numbers up to `toSequenceNr` - * (inclusive). + * (inclusive). `Long.MaxValue` may be used as `toSequenceNr` to delete all persistent messages. */ final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, persistentActor: ActorRef) extends Request diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala index c393e4cd05..96a0fc8106 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala @@ -56,7 +56,7 @@ class PersistentActorDeleteFailureSpec extends PersistenceSpec(PersistenceSpec.c "have default warn logging be triggered, when deletion failed" in { val persistentActor = system.actorOf(Props(classOf[DoesNotHandleDeleteFailureActor], name, testActor)) system.eventStream.subscribe(testActor, classOf[Logging.Warning]) - persistentActor ! DeleteTo(100) + persistentActor ! DeleteTo(Long.MaxValue) val message = expectMsgType[Warning].message.toString message should include("Failed to deleteMessages") message should include("Boom! Unable to delete events!") // the `cause` message @@ -65,9 +65,9 @@ class PersistentActorDeleteFailureSpec extends PersistenceSpec(PersistenceSpec.c "be receive an DeleteMessagesFailure when deletion failed, and the default logging should not be triggered" in { val persistentActor = system.actorOf(Props(classOf[HandlesDeleteFailureActor], name, testActor)) system.eventStream.subscribe(testActor, classOf[Logging.Warning]) - persistentActor ! DeleteTo(100) + persistentActor ! DeleteTo(Long.MaxValue) expectMsgType[DeleteMessagesFailure] - expectNoMsg(100.millis) // since the actor handled the message, we do not issue warn logging automatically + expectNoMessage(100.millis) // since the actor handled the message, we do not issue warn logging automatically } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index e1caa3230c..8461c34ed4 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -78,6 +78,9 @@ object PersistentActorSpec { case d: DeleteMessagesSuccess ⇒ val replyTo = askedForDelete.getOrElse(throw new RuntimeException("Received DeleteMessagesSuccess without anyone asking for delete!")) replyTo ! d + case d: DeleteMessagesFailure ⇒ + val replyTo = askedForDelete.getOrElse(throw new RuntimeException("Received DeleteMessagesFailure without anyone asking for delete!")) + replyTo ! d } override protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = @@ -1538,6 +1541,18 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg(Nil) } + "not be able to delete higher seqnr than current" in { + val persistentActor = behavior1PersistentActor + persistentActor ! Cmd("b") + persistentActor ! GetState + expectMsg(List("a-1", "a-2", "b-1", "b-2")) + persistentActor ! Delete(5L) // > current 4 + persistentActor ! "boom" // restart, recover + expectMsgType[DeleteMessagesFailure].cause.getMessage should include("less than or equal to lastSequenceNr") + persistentActor ! GetState + expectMsg(List("a-1", "a-2", "b-1", "b-2")) + } + "recover the message which caused the restart" in { val persistentActor = recoverMessageCausedRestart persistentActor ! "Boom"