=per #15916 Read highestSeqNr first in replay

* we need to read the higestSeqNr anyway and it is better
  to do it first and limit the asyncReadHighestSequenceNr
  to that (instead of Long.MaxValue)
* return the highestSeqNr in the ReplayMessagesSuccess
* this also removes one become state in PersistentActor recovery
  logic
This commit is contained in:
Patrik Nordwall 2015-06-26 08:32:05 +02:00
parent aaa6b623e1
commit 09a2f9c248
12 changed files with 103 additions and 126 deletions

View file

@ -92,50 +92,54 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
"replay all messages" in {
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
1 to 5 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L))
}
"replay messages using a lower sequence number bound" in {
journal ! ReplayMessages(3, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
3 to 5 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L))
}
"replay messages using an upper sequence number bound" in {
journal ! ReplayMessages(1, 3, Long.MaxValue, pid, receiverProbe.ref)
1 to 3 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L))
}
"replay messages using a count limit" in {
journal ! ReplayMessages(1, Long.MaxValue, 3, pid, receiverProbe.ref)
1 to 3 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L))
}
"replay messages using a lower and upper sequence number bound" in {
journal ! ReplayMessages(2, 4, Long.MaxValue, pid, receiverProbe.ref)
2 to 4 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L))
}
"replay messages using a lower and upper sequence number bound and a count limit" in {
journal ! ReplayMessages(2, 4, 2, pid, receiverProbe.ref)
2 to 3 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L))
}
"replay a single if lower sequence number bound equals upper sequence number bound" in {
journal ! ReplayMessages(2, 2, Long.MaxValue, pid, receiverProbe.ref)
2 to 2 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L))
}
"replay a single message if count limit equals 1" in {
journal ! ReplayMessages(2, 4, 1, pid, receiverProbe.ref)
2 to 2 foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
receiverProbe.expectMsg(ReplayMessagesSuccess)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L))
}
"not replay messages if count limit equals 0" in {
journal ! ReplayMessages(2, 4, 0, pid, receiverProbe.ref)
receiverProbe.expectMsg(ReplayMessagesSuccess)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L))
}
"not replay messages if lower sequence number bound is greater than upper sequence number bound" in {
journal ! ReplayMessages(3, 2, Long.MaxValue, pid, receiverProbe.ref)
receiverProbe.expectMsg(ReplayMessagesSuccess)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L))
}
"not replay messages if the persistent actor has not yet written messages" in {
journal ! ReplayMessages(0, Long.MaxValue, Long.MaxValue, "non-existing-pid", receiverProbe.ref)
receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 0L))
}
"not replay permanently deleted messages (range deletion)" in {
val receiverProbe2 = TestProbe()
@ -152,18 +156,6 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
receiverProbe2.expectNoMsg(200.millis)
}
"return a highest stored sequence number > 0 if the persistent actor has already written messages and the message log is non-empty" in {
journal ! ReadHighestSequenceNr(3L, pid, receiverProbe.ref)
receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(5))
journal ! ReadHighestSequenceNr(5L, pid, receiverProbe.ref)
receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(5))
}
"return a highest stored sequence number == 0 if the persistent actor has not yet written messages" in {
journal ! ReadHighestSequenceNr(0L, "non-existing-pid", receiverProbe.ref)
receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(0))
}
"reject non-serializable events" in {
// there is no chance that a journal could create a data representation for type of event
val notSerializableEvent = new Object { override def toString = "not serializable" }

View file

@ -23,6 +23,10 @@ interface AsyncRecoveryPlugin {
* marked as deleted. In this case a replayed message's `deleted` method must
* return `true`.
*
* The `toSequenceNr` is the lowest of what was returned by
* {@link #doAsyncReadHighestSequenceNr} and what the user specified as
* recovery {@link akka.persistence.Recovery} parameter.
*
* @param persistenceId
* id of the persistent actor.
* @param fromSequenceNr
@ -39,7 +43,11 @@ interface AsyncRecoveryPlugin {
/**
* Java API, Plugin API: asynchronously reads the highest stored sequence
* number for the given `persistenceId`.
* number for the given `persistenceId`. The persistent actor will use the
* highest sequence number after recovery as the starting point when
* persisting new events. This sequence number is also used as `toSequenceNr`
* in subsequent call to [[#asyncReplayMessages]] unless the user has
* specified a lower `toSequenceNr`.
*
* @param persistenceId
* id of the persistent actor.

View file

@ -3,8 +3,6 @@
*/
package akka.persistence
import akka.persistence.JournalProtocol.ReplayMessagesSuccess
import scala.annotation.tailrec
import scala.collection.breakOut
import scala.collection.immutable

View file

@ -215,8 +215,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
override def unhandled(message: Any): Unit = {
message match {
case RecoveryCompleted | ReadHighestSequenceNrSuccess | ReadHighestSequenceNrFailure // mute
case m super.unhandled(m)
case RecoveryCompleted // mute
case m super.unhandled(m)
}
}
@ -466,8 +466,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* Processes replayed messages, if any. The actor's `receiveRecover` is invoked with the replayed
* events.
*
* If replay succeeds it switches to `initializing` state and requests the highest stored sequence
* number from the journal. Otherwise the actor is stopped.
* If replay succeeds it got highest stored sequence number response from the journal and then switches
* to `processingCommands` state. Otherwise the actor is stopped.
* If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`.
*
* All incoming messages are stashed.
@ -485,37 +485,15 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
case NonFatal(t)
try onReplayFailure(t, Some(p.payload)) finally context.stop(self)
}
case ReplayMessagesSuccess
case ReplayMessagesSuccess(highestSeqNr)
onReplaySuccess() // callback for subclass implementation
changeState(initializing(recoveryBehavior))
journal ! ReadHighestSequenceNr(lastSequenceNr, persistenceId, self)
case ReplayMessagesFailure(cause)
try onReplayFailure(cause, event = None) finally context.stop(self)
case other
internalStash.stash()
}
}
/**
* Processes the highest stored sequence number response from the journal and then switches
* to `processingCommands` state.
* All incoming messages are stashed.
*/
private def initializing(recoveryBehavior: Receive) = new State {
override def toString: String = "initializing"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case ReadHighestSequenceNrSuccess(highest)
changeState(processingCommands)
sequenceNr = highest
setLastSequenceNr(highest)
sequenceNr = highestSeqNr
setLastSequenceNr(highestSeqNr)
internalStash.unstashAll()
Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
case ReadHighestSequenceNrFailure(cause)
log.error(cause, "PersistentActor could not retrieve highest sequence number and must " +
"therefore be stopped. (persisten id = [{}]).", persistenceId)
context.stop(self)
case ReplayMessagesFailure(cause)
try onReplayFailure(cause, event = None) finally context.stop(self)
case other
internalStash.stash()
}

View file

@ -122,8 +122,13 @@ private[persistence] object JournalProtocol {
/**
* Reply message to a successful [[ReplayMessages]] request. This reply is sent to the requestor
* after all [[ReplayedMessage]] have been sent (if any).
*
* It includes the highest stored sequence number of a given persistent actor. Note that the
* replay might have been limited to a lower sequence number.
*
* @param highestSequenceNr highest stored sequence number.
*/
case object ReplayMessagesSuccess
case class ReplayMessagesSuccess(highestSequenceNr: Long)
extends Response with DeadLetterSuppression
/**
@ -133,29 +138,4 @@ private[persistence] object JournalProtocol {
final case class ReplayMessagesFailure(cause: Throwable)
extends Response with DeadLetterSuppression
/**
* Request to read the highest stored sequence number of a given persistent actor.
*
* @param fromSequenceNr optional hint where to start searching for the maximum sequence number.
* @param persistenceId requesting persistent actor id.
* @param persistentActor requesting persistent actor.
*/
final case class ReadHighestSequenceNr(fromSequenceNr: Long = 1L, persistenceId: String, persistentActor: ActorRef)
extends Request
/**
* Reply message to a successful [[ReadHighestSequenceNr]] request.
*
* @param highestSequenceNr read highest sequence number.
*/
final case class ReadHighestSequenceNrSuccess(highestSequenceNr: Long)
extends Response
/**
* Reply message to a failed [[ReadHighestSequenceNr]] request.
*
* @param cause failure cause.
*/
final case class ReadHighestSequenceNrFailure(cause: Throwable)
extends Response
}

View file

@ -293,7 +293,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
case NonFatal(t)
changeState(ignoreRemainingReplay(t))
}
case ReplayMessagesSuccess
case _: ReplayMessagesSuccess
onReplayComplete()
case ReplayMessagesFailure(cause)
try onReplayError(cause) finally onReplayComplete()
@ -339,8 +339,8 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
// replay must be a full replay (up to the highest stored sequence number)
// Recover(lastSequenceNr) is sent by preRestart
setLastSequenceNr(Long.MaxValue)
case ReplayMessagesSuccess replayCompleted(receive)
case _ internalStash.stash()
case _: ReplayMessagesSuccess replayCompleted(receive)
case _ internalStash.stash()
}
def replayCompleted(receive: Receive): Unit = {

View file

@ -24,6 +24,9 @@ trait AsyncRecovery {
* as deleted. In this case a replayed message's `deleted` method must return
* `true`.
*
* The `toSequenceNr` is the lowest of what was returned by [[#asyncReadHighestSequenceNr]]
* and what the user specified as recovery [[akka.persistence.Recovery]] parameter.
*
* @param persistenceId persistent actor id.
* @param fromSequenceNr sequence number where replay should start (inclusive).
* @param toSequenceNr sequence number where replay should end (inclusive).
@ -38,7 +41,10 @@ trait AsyncRecovery {
/**
* Plugin API: asynchronously reads the highest stored sequence number for the
* given `persistenceId`.
* given `persistenceId`. The persistent actor will use the highest sequence
* number after recovery as the starting point when persisting new events.
* This sequence number is also used as `toSequenceNr` in subsequent call
* to [[#asyncReplayMessages]] unless the user has specified a lower `toSequenceNr`.
*
* @param persistenceId persistent actor id.
* @param fromSequenceNr hint where to start searching for the highest sequence

View file

@ -14,6 +14,7 @@ import scala.util.control.NonFatal
import scala.util.Try
import scala.util.Success
import scala.util.Failure
import akka.AkkaException
/**
* Abstract journal, optimized for asynchronous, non-blocking writes.
@ -93,30 +94,29 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
}
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor)
// Send replayed messages and replay result to persistentActor directly. No need
// to resequence replayed messages relative to written and looped messages.
asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p
if (!p.deleted) // old records from 2.3 may still have the deleted flag
adaptFromJournal(p).foreach { adaptedPersistentRepr
persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
}
} map {
case _ ReplayMessagesSuccess
} recover {
asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).flatMap { highSeqNr
val toSeqNr = math.min(toSequenceNr, highSeqNr)
if (highSeqNr == 0L || fromSequenceNr > toSeqNr)
Future.successful(highSeqNr)
else {
// Send replayed messages and replay result to persistentActor directly. No need
// to resequence replayed messages relative to written and looped messages.
asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p
if (!p.deleted) // old records from 2.3 may still have the deleted flag
adaptFromJournal(p).foreach { adaptedPersistentRepr
persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
}
}.map(_ highSeqNr)
}
}.map {
highSeqNr ReplayMessagesSuccess(highSeqNr)
}.recover {
case e ReplayMessagesFailure(e)
} pipeTo persistentActor onSuccess {
}.pipeTo(persistentActor).onSuccess {
case _ if publish context.system.eventStream.publish(r)
}
case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor)
// Send read highest sequence number to persistentActor directly. No need
// to resequence the result relative to written and looped messages.
asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).map {
highest ReadHighestSequenceNrSuccess(highest)
} recover {
case e ReadHighestSequenceNrFailure(e)
} pipeTo persistentActor
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor)
asyncDeleteMessagesTo(persistenceId, toSequenceNr) onComplete {
case Success(_) if (publish) context.system.eventStream.publish(d)

View file

@ -24,6 +24,7 @@ import scala.language.postfixOps
private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash with ActorLogging {
import AsyncWriteProxy._
import AsyncWriteTarget._
import context.dispatcher
private var isInitialized = false
private var store: ActorRef = _
@ -54,7 +55,9 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash
}
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
(store ? ReadHighestSequenceNr(persistenceId, fromSequenceNr)).mapTo[Long]
(store ? ReplayMessages(persistenceId, fromSequenceNr = 0L, toSequenceNr = 0L, max = 0L)).map {
case ReplaySuccess(highest) highest
}
}
/**
@ -78,13 +81,11 @@ private[persistence] object AsyncWriteTarget {
final case class ReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)
@SerialVersionUID(1L)
case object ReplaySuccess
case class ReplaySuccess(highestSequenceNr: Long)
@SerialVersionUID(1L)
final case class ReplayFailure(cause: Throwable)
@SerialVersionUID(1L)
final case class ReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long)
}
/**
@ -100,7 +101,7 @@ private class ReplayMediator(replayCallback: PersistentRepr ⇒ Unit, replayComp
def receive = {
case p: PersistentRepr replayCallback(p)
case ReplaySuccess
case _: ReplaySuccess
replayCompletionPromise.success(())
context.stop(self)
case ReplayFailure(cause)

View file

@ -85,9 +85,9 @@ private[persistence] class InmemStore extends Actor with InmemMessages with Writ
case DeleteMessagesTo(pid, tsnr)
sender() ! (1L to tsnr foreach { snr delete(pid, snr) })
case ReplayMessages(pid, fromSnr, toSnr, max)
read(pid, fromSnr, toSnr, max).foreach { sender() ! _ }
sender() ! ReplaySuccess
case ReadHighestSequenceNr(persistenceId, _)
sender() ! highestSequenceNr(persistenceId)
val highest = highestSequenceNr(pid)
if (highest != 0L && max != 0L)
read(pid, fromSnr, math.min(toSnr, highest), max).foreach { sender() ! _ }
sender() ! ReplaySuccess(highest)
}
}

View file

@ -23,6 +23,8 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le
def receive = {
case WriteMessages(messages)
// TODO it would be nice to DRY this with AsyncWriteJournal, but this is using
// AsyncWriteProxy message protocol
val prepared = Try(preparePersistentBatch(messages))
val writeResult = (prepared match {
case Success(prep)
@ -43,13 +45,24 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le
case DeleteMessagesTo(pid, tsnr)
asyncDeleteMessagesTo(pid, tsnr).pipeTo(sender())
case ReadHighestSequenceNr(pid, fromSequenceNr)
asyncReadHighestSequenceNr(pid, fromSequenceNr).pipeTo(sender())
case ReplayMessages(pid, fromSnr, toSnr, max)
Try(replayMessages(numericId(pid), fromSnr, toSnr, max)(p adaptFromJournal(p).foreach { sender() ! _ })) match {
case Success(max) sender() ! ReplaySuccess
case Failure(cause) sender() ! ReplayFailure(cause)
}
case ReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max)
// TODO it would be nice to DRY this with AsyncWriteJournal, but this is using
// AsyncWriteProxy message protocol
val replyTo = sender()
asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).flatMap { highSeqNr
if (highSeqNr == 0L || max == 0L)
Future.successful(highSeqNr)
else {
val toSeqNr = math.min(toSequenceNr, highSeqNr)
asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p
if (!p.deleted) // old records from 2.3 may still have the deleted flag
adaptFromJournal(p).foreach(replyTo ! _)
}.map(_ highSeqNr)
}
}.map {
highSeqNr ReplaySuccess(highSeqNr)
}.recover {
case e ReplayFailure(e)
}.pipeTo(replyTo)
}
}

View file

@ -44,14 +44,15 @@ object PersistentActorFailureSpec {
case w: WriteMessages if checkSerializable(w).exists(_.isFailure)
sender() ! checkSerializable(w)
case ReplayMessages(pid, fromSnr, toSnr, max)
val highest = highestSequenceNr(pid)
val readFromStore = read(pid, fromSnr, toSnr, max)
if (readFromStore.length == 0)
sender() ! ReplaySuccess
sender() ! ReplaySuccess(highest)
else if (isCorrupt(readFromStore))
sender() ! ReplayFailure(new SimulatedException(s"blahonga $fromSnr $toSnr"))
else {
readFromStore.foreach(sender() ! _)
sender() ! ReplaySuccess
sender() ! ReplaySuccess(highest)
}
}