Enforce valid seqnr for deletes, #25487 (#25488)

This commit is contained in:
Patrik Nordwall 2018-08-20 08:14:56 +02:00 committed by Konrad `ktoso` Malawski
parent 97c8e5b6f8
commit 9e66f7121f
4 changed files with 31 additions and 6 deletions

View file

@ -427,10 +427,20 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
* If the delete is successful a [[DeleteMessagesSuccess]] will be sent to the actor.
* If the delete fails a [[DeleteMessagesFailure]] will be sent to the actor.
*
* The given `toSequenceNr` must be less than or equal to [[Eventsourced#lastSequenceNr]], otherwise
* [[DeleteMessagesFailure]] is sent to the actor without performing the delete. All persistent
* messages may be deleted without specifying the actual sequence number by using `Long.MaxValue`
* as the `toSequenceNr`.
*
* @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
*/
def deleteMessages(toSequenceNr: Long): Unit =
journal ! DeleteMessagesTo(persistenceId, toSequenceNr, self)
def deleteMessages(toSequenceNr: Long): Unit = {
if (toSequenceNr == Long.MaxValue || toSequenceNr <= lastSequenceNr)
journal ! DeleteMessagesTo(persistenceId, toSequenceNr, self)
else
self ! DeleteMessagesFailure(new RuntimeException(
s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"), toSequenceNr)
}
/**
* Returns `true` if this persistent actor is currently recovering.

View file

@ -24,7 +24,7 @@ private[persistence] object JournalProtocol {
/**
* Request to delete all persistent messages with sequence numbers up to `toSequenceNr`
* (inclusive).
* (inclusive). `Long.MaxValue` may be used as `toSequenceNr` to delete all persistent messages.
*/
final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, persistentActor: ActorRef)
extends Request

View file

@ -56,7 +56,7 @@ class PersistentActorDeleteFailureSpec extends PersistenceSpec(PersistenceSpec.c
"have default warn logging be triggered, when deletion failed" in {
val persistentActor = system.actorOf(Props(classOf[DoesNotHandleDeleteFailureActor], name, testActor))
system.eventStream.subscribe(testActor, classOf[Logging.Warning])
persistentActor ! DeleteTo(100)
persistentActor ! DeleteTo(Long.MaxValue)
val message = expectMsgType[Warning].message.toString
message should include("Failed to deleteMessages")
message should include("Boom! Unable to delete events!") // the `cause` message
@ -65,9 +65,9 @@ class PersistentActorDeleteFailureSpec extends PersistenceSpec(PersistenceSpec.c
"be receive an DeleteMessagesFailure when deletion failed, and the default logging should not be triggered" in {
val persistentActor = system.actorOf(Props(classOf[HandlesDeleteFailureActor], name, testActor))
system.eventStream.subscribe(testActor, classOf[Logging.Warning])
persistentActor ! DeleteTo(100)
persistentActor ! DeleteTo(Long.MaxValue)
expectMsgType[DeleteMessagesFailure]
expectNoMsg(100.millis) // since the actor handled the message, we do not issue warn logging automatically
expectNoMessage(100.millis) // since the actor handled the message, we do not issue warn logging automatically
}
}

View file

@ -78,6 +78,9 @@ object PersistentActorSpec {
case d: DeleteMessagesSuccess
val replyTo = askedForDelete.getOrElse(throw new RuntimeException("Received DeleteMessagesSuccess without anyone asking for delete!"))
replyTo ! d
case d: DeleteMessagesFailure
val replyTo = askedForDelete.getOrElse(throw new RuntimeException("Received DeleteMessagesFailure without anyone asking for delete!"))
replyTo ! d
}
override protected def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit =
@ -1538,6 +1541,18 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
expectMsg(Nil)
}
"not be able to delete higher seqnr than current" in {
val persistentActor = behavior1PersistentActor
persistentActor ! Cmd("b")
persistentActor ! GetState
expectMsg(List("a-1", "a-2", "b-1", "b-2"))
persistentActor ! Delete(5L) // > current 4
persistentActor ! "boom" // restart, recover
expectMsgType[DeleteMessagesFailure].cause.getMessage should include("less than or equal to lastSequenceNr")
persistentActor ! GetState
expectMsg(List("a-1", "a-2", "b-1", "b-2"))
}
"recover the message which caused the restart" in {
val persistentActor = recoverMessageCausedRestart
persistentActor ! "Boom"