Merge pull request #17843 from akka/wip-17799-deleteMessages-refactoring-bug-patriknw

+per #17799 Fix refactoring bug in deleteMessages
This commit is contained in:
Patrik Nordwall 2015-06-26 12:06:07 +02:00
commit b0c2817a36
8 changed files with 61 additions and 8 deletions

View file

@ -291,6 +291,9 @@ next message.
If there is a problem with recovering the state of the actor from the journal when the actor is
started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped.
If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default)
and the actor continues with next message.
Atomic writes
-------------
@ -323,6 +326,9 @@ Message deletion
To delete all messages (journaled by a single persistent actor) up to a specified sequence number,
persistent actors may call the ``deleteMessages`` method.
If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default)
and the actor continues with next message.
.. _persistent-views-java-lambda:
Views

View file

@ -326,6 +326,12 @@ Message deletion
To delete all messages (journaled by a single persistent actor) up to a specified sequence number,
persistent actors may call the ``deleteMessages`` method.
If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default)
and the actor continues with next message.
If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default)
and the actor continues with next message.
.. _persistent-views-java:
Persistent Views

View file

@ -280,6 +280,9 @@ next message.
If there is a problem with recovering the state of the actor from the journal when the actor is
started, ``onReplayFailure`` is called (logging the error by default) and the actor will be stopped.
If the ``deleteMessages`` fails ``onDeleteMessagesFailure`` will be called (logging a warning by default)
and the actor continues with next message.
Atomic writes
-------------
@ -315,6 +318,9 @@ Message deletion
To delete all messages (journaled by a single persistent actor) up to a specified sequence number,
persistent actors may call the ``deleteMessages`` method.
If the delete fails ``onDeleteMessagesFailure`` will be called (logging a warning by default)
and the actor continues with next message.
.. _persistent-views:
Persistent Views

View file

@ -1,5 +1,6 @@
package akka.persistence.journal
import scala.concurrent.duration._
import scala.collection.immutable.Seq
import akka.actor._
@ -137,7 +138,8 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
receiverProbe.expectMsg(ReplayMessagesSuccess)
}
"not replay permanently deleted messages (range deletion)" in {
val cmd = DeleteMessagesTo(pid, 3)
val receiverProbe2 = TestProbe()
val cmd = DeleteMessagesTo(pid, 3, receiverProbe2.ref)
val sub = TestProbe()
subscribe[DeleteMessagesTo](sub.ref)
@ -146,6 +148,8 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
List(4, 5) foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe2.expectNoMsg(200.millis)
}
"return a highest stored sequence number > 0 if the persistent actor has already written messages and the message log is non-empty" in {

View file

@ -147,6 +147,18 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
event.getClass.getName, seqNr, persistenceId, cause.getMessage)
}
/**
* Called when ``deleteMessages`` failed. By default this method logs the problem
* as a warning, and the actor continues.
*
* @param cause failure cause
* @param toSequenceNr the sequence number parameter of the ``deleteMessages`` call
*/
protected def onDeleteMessagesFailure(cause: Throwable, toSequenceNr: Long): Unit = {
log.warning("Failed to deleteMessages toSequenceNr [{}] for persistenceId [{}] due to [{}].",
toSequenceNr, persistenceId, cause.getMessage)
}
/**
* User-overridable callback. Called when a persistent actor is started or restarted.
* Default implementation sends a `Recover()` to `self`. Note that if you override
@ -378,10 +390,12 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
/**
* Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`.
*
* If the delete fails [[#onDeleteMessagesFailure]] will be called.
*
* @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
*/
def deleteMessages(toSequenceNr: Long): Unit =
deleteMessages(toSequenceNr)
journal ! DeleteMessagesTo(persistenceId, toSequenceNr, self)
/**
* Returns `true` if this persistent actor is currently recovering.
@ -566,6 +580,10 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
case WriteMessagesFailed(_)
() // it will be stopped by the first WriteMessageFailure message
case DeleteMessagesFailure(e, toSequenceNr)
onDeleteMessagesFailure(e, toSequenceNr)
}
def onWriteMessageComplete(err: Boolean): Unit =

View file

@ -25,14 +25,14 @@ private[persistence] object JournalProtocol {
/**
* Reply message to a failed [[DeleteMessagesTo]] request.
*/
final case class DeleteMessagesFailure(cause: Throwable)
final case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long)
extends Response
/**
* Request to delete all persistent messages with sequence numbers up to `toSequenceNr`
* (inclusive).
*/
final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long)
final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, persistentActor: ActorRef)
extends Request
/**

View file

@ -117,10 +117,10 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
case e ReadHighestSequenceNrFailure(e)
} pipeTo persistentActor
case d @ DeleteMessagesTo(persistenceId, toSequenceNr)
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor)
asyncDeleteMessagesTo(persistenceId, toSequenceNr) onComplete {
case Success(_) if (publish) context.system.eventStream.publish(d)
case Failure(e)
case Failure(e) persistentActor ! DeleteMessagesFailure(e, toSequenceNr)
}
}

View file

@ -20,6 +20,7 @@ object PersistentActorSpec {
final case class Cmd(data: Any)
final case class Evt(data: Any)
final case class LatchCmd(latch: TestLatch, data: Any) extends NoSerializationVerificationNeeded
final case class Delete(toSequenceNr: Long)
abstract class ExamplePersistentActor(name: String) extends NamedPersistentActor(name) with PersistentActor {
var events: List[Any] = Nil
@ -29,8 +30,9 @@ object PersistentActorSpec {
}
val commonBehavior: Receive = {
case "boom" throw new TestException("boom")
case GetState sender() ! events.reverse
case "boom" throw new TestException("boom")
case Delete(toSequenceNr) deleteMessages(toSequenceNr)
case GetState sender() ! events.reverse
}
def receiveRecover = updateState
@ -892,6 +894,17 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
expectNoMsg(100.millis)
}
"be able to delete events" in {
val persistentActor = namedPersistentActor[Behavior1PersistentActor]
persistentActor ! Cmd("b")
persistentActor ! GetState
expectMsg(List("a-1", "a-2", "b-1", "b-2"))
persistentActor ! Delete(2L) // delete "a-1" and "a-2"
persistentActor ! "boom" // restart, recover
persistentActor ! GetState
expectMsg(List("b-1", "b-2"))
}
}
}