diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index 72b0cd3eab..64dd33bb34 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -92,50 +92,54 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { "replay all messages" in { journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) 1 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess) + receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) } "replay messages using a lower sequence number bound" in { journal ! ReplayMessages(3, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) 3 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess) + receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) } "replay messages using an upper sequence number bound" in { journal ! ReplayMessages(1, 3, Long.MaxValue, pid, receiverProbe.ref) 1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess) + receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) } "replay messages using a count limit" in { journal ! ReplayMessages(1, Long.MaxValue, 3, pid, receiverProbe.ref) 1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess) + receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) } "replay messages using a lower and upper sequence number bound" in { journal ! ReplayMessages(2, 4, Long.MaxValue, pid, receiverProbe.ref) 2 to 4 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess) + receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) } "replay messages using a lower and upper sequence number bound and a count limit" in { journal ! ReplayMessages(2, 4, 2, pid, receiverProbe.ref) 2 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess) + receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) } "replay a single if lower sequence number bound equals upper sequence number bound" in { journal ! ReplayMessages(2, 2, Long.MaxValue, pid, receiverProbe.ref) 2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess) + receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) } "replay a single message if count limit equals 1" in { journal ! ReplayMessages(2, 4, 1, pid, receiverProbe.ref) 2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } - receiverProbe.expectMsg(ReplayMessagesSuccess) + receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) } "not replay messages if count limit equals 0" in { journal ! ReplayMessages(2, 4, 0, pid, receiverProbe.ref) - receiverProbe.expectMsg(ReplayMessagesSuccess) + receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) } "not replay messages if lower sequence number bound is greater than upper sequence number bound" in { journal ! ReplayMessages(3, 2, Long.MaxValue, pid, receiverProbe.ref) - receiverProbe.expectMsg(ReplayMessagesSuccess) + receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 5L)) + } + "not replay messages if the persistent actor has not yet written messages" in { + journal ! ReplayMessages(0, Long.MaxValue, Long.MaxValue, "non-existing-pid", receiverProbe.ref) + receiverProbe.expectMsg(ReplayMessagesSuccess(highestSequenceNr = 0L)) } "not replay permanently deleted messages (range deletion)" in { val receiverProbe2 = TestProbe() @@ -152,18 +156,6 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { receiverProbe2.expectNoMsg(200.millis) } - "return a highest stored sequence number > 0 if the persistent actor has already written messages and the message log is non-empty" in { - journal ! ReadHighestSequenceNr(3L, pid, receiverProbe.ref) - receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(5)) - - journal ! ReadHighestSequenceNr(5L, pid, receiverProbe.ref) - receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(5)) - } - "return a highest stored sequence number == 0 if the persistent actor has not yet written messages" in { - journal ! ReadHighestSequenceNr(0L, "non-existing-pid", receiverProbe.ref) - receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(0)) - } - "reject non-serializable events" in { // there is no chance that a journal could create a data representation for type of event val notSerializableEvent = new Object { override def toString = "not serializable" } diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java index c1b95cafbc..553dd47af2 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java @@ -23,6 +23,10 @@ interface AsyncRecoveryPlugin { * marked as deleted. In this case a replayed message's `deleted` method must * return `true`. * + * The `toSequenceNr` is the lowest of what was returned by + * {@link #doAsyncReadHighestSequenceNr} and what the user specified as + * recovery {@link akka.persistence.Recovery} parameter. + * * @param persistenceId * id of the persistent actor. * @param fromSequenceNr @@ -34,12 +38,16 @@ interface AsyncRecoveryPlugin { * @param replayCallback * called to replay a single message. Can be called from any thread. */ - Future doAsyncReplayMessages(String persistenceId, long fromSequenceNr, + Future doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max, Consumer replayCallback); /** * Java API, Plugin API: asynchronously reads the highest stored sequence - * number for the given `persistenceId`. + * number for the given `persistenceId`. The persistent actor will use the + * highest sequence number after recovery as the starting point when + * persisting new events. This sequence number is also used as `toSequenceNr` + * in subsequent call to [[#asyncReplayMessages]] unless the user has + * specified a lower `toSequenceNr`. * * @param persistenceId * id of the persistent actor. diff --git a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala index 53020b460c..dd3f09dce7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala @@ -3,8 +3,6 @@ */ package akka.persistence -import akka.persistence.JournalProtocol.ReplayMessagesSuccess - import scala.annotation.tailrec import scala.collection.breakOut import scala.collection.immutable diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 675bc2e398..181e54117a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -215,8 +215,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas override def unhandled(message: Any): Unit = { message match { - case RecoveryCompleted | ReadHighestSequenceNrSuccess | ReadHighestSequenceNrFailure ⇒ // mute - case m ⇒ super.unhandled(m) + case RecoveryCompleted ⇒ // mute + case m ⇒ super.unhandled(m) } } @@ -466,8 +466,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * Processes replayed messages, if any. The actor's `receiveRecover` is invoked with the replayed * events. * - * If replay succeeds it switches to `initializing` state and requests the highest stored sequence - * number from the journal. Otherwise the actor is stopped. + * If replay succeeds it got highest stored sequence number response from the journal and then switches + * to `processingCommands` state. Otherwise the actor is stopped. * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`. * * All incoming messages are stashed. @@ -485,37 +485,15 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas case NonFatal(t) ⇒ try onReplayFailure(t, Some(p.payload)) finally context.stop(self) } - case ReplayMessagesSuccess ⇒ + case ReplayMessagesSuccess(highestSeqNr) ⇒ onReplaySuccess() // callback for subclass implementation - changeState(initializing(recoveryBehavior)) - journal ! ReadHighestSequenceNr(lastSequenceNr, persistenceId, self) - case ReplayMessagesFailure(cause) ⇒ - try onReplayFailure(cause, event = None) finally context.stop(self) - case other ⇒ - internalStash.stash() - } - } - - /** - * Processes the highest stored sequence number response from the journal and then switches - * to `processingCommands` state. - * All incoming messages are stashed. - */ - private def initializing(recoveryBehavior: Receive) = new State { - override def toString: String = "initializing" - override def recoveryRunning: Boolean = true - - override def stateReceive(receive: Receive, message: Any) = message match { - case ReadHighestSequenceNrSuccess(highest) ⇒ changeState(processingCommands) - sequenceNr = highest - setLastSequenceNr(highest) + sequenceNr = highestSeqNr + setLastSequenceNr(highestSeqNr) internalStash.unstashAll() Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) - case ReadHighestSequenceNrFailure(cause) ⇒ - log.error(cause, "PersistentActor could not retrieve highest sequence number and must " + - "therefore be stopped. (persisten id = [{}]).", persistenceId) - context.stop(self) + case ReplayMessagesFailure(cause) ⇒ + try onReplayFailure(cause, event = None) finally context.stop(self) case other ⇒ internalStash.stash() } diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index ce512c2e67..5a3d72cd5a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -122,8 +122,13 @@ private[persistence] object JournalProtocol { /** * Reply message to a successful [[ReplayMessages]] request. This reply is sent to the requestor * after all [[ReplayedMessage]] have been sent (if any). + * + * It includes the highest stored sequence number of a given persistent actor. Note that the + * replay might have been limited to a lower sequence number. + * + * @param highestSequenceNr highest stored sequence number. */ - case object ReplayMessagesSuccess + case class ReplayMessagesSuccess(highestSequenceNr: Long) extends Response with DeadLetterSuppression /** @@ -133,29 +138,4 @@ private[persistence] object JournalProtocol { final case class ReplayMessagesFailure(cause: Throwable) extends Response with DeadLetterSuppression - /** - * Request to read the highest stored sequence number of a given persistent actor. - * - * @param fromSequenceNr optional hint where to start searching for the maximum sequence number. - * @param persistenceId requesting persistent actor id. - * @param persistentActor requesting persistent actor. - */ - final case class ReadHighestSequenceNr(fromSequenceNr: Long = 1L, persistenceId: String, persistentActor: ActorRef) - extends Request - - /** - * Reply message to a successful [[ReadHighestSequenceNr]] request. - * - * @param highestSequenceNr read highest sequence number. - */ - final case class ReadHighestSequenceNrSuccess(highestSequenceNr: Long) - extends Response - - /** - * Reply message to a failed [[ReadHighestSequenceNr]] request. - * - * @param cause failure cause. - */ - final case class ReadHighestSequenceNrFailure(cause: Throwable) - extends Response } diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala index e21370f211..6e22a7b683 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala @@ -293,7 +293,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory case NonFatal(t) ⇒ changeState(ignoreRemainingReplay(t)) } - case ReplayMessagesSuccess ⇒ + case _: ReplayMessagesSuccess ⇒ onReplayComplete() case ReplayMessagesFailure(cause) ⇒ try onReplayError(cause) finally onReplayComplete() @@ -339,8 +339,8 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory // replay must be a full replay (up to the highest stored sequence number) // Recover(lastSequenceNr) is sent by preRestart setLastSequenceNr(Long.MaxValue) - case ReplayMessagesSuccess ⇒ replayCompleted(receive) - case _ ⇒ internalStash.stash() + case _: ReplayMessagesSuccess ⇒ replayCompleted(receive) + case _ ⇒ internalStash.stash() } def replayCompleted(receive: Receive): Unit = { diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala index a009f100df..5b3c6f9912 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala @@ -24,6 +24,9 @@ trait AsyncRecovery { * as deleted. In this case a replayed message's `deleted` method must return * `true`. * + * The `toSequenceNr` is the lowest of what was returned by [[#asyncReadHighestSequenceNr]] + * and what the user specified as recovery [[akka.persistence.Recovery]] parameter. + * * @param persistenceId persistent actor id. * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). @@ -38,7 +41,10 @@ trait AsyncRecovery { /** * Plugin API: asynchronously reads the highest stored sequence number for the - * given `persistenceId`. + * given `persistenceId`. The persistent actor will use the highest sequence + * number after recovery as the starting point when persisting new events. + * This sequence number is also used as `toSequenceNr` in subsequent call + * to [[#asyncReplayMessages]] unless the user has specified a lower `toSequenceNr`. * * @param persistenceId persistent actor id. * @param fromSequenceNr hint where to start searching for the highest sequence diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 20a861a980..3b4ab470a7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -14,6 +14,7 @@ import scala.util.control.NonFatal import scala.util.Try import scala.util.Success import scala.util.Failure +import akka.AkkaException /** * Abstract journal, optimized for asynchronous, non-blocking writes. @@ -93,30 +94,29 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) ⇒ - // Send replayed messages and replay result to persistentActor directly. No need - // to resequence replayed messages relative to written and looped messages. - asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p ⇒ - if (!p.deleted) // old records from 2.3 may still have the deleted flag - adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒ - persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender) - } - } map { - case _ ⇒ ReplayMessagesSuccess - } recover { + + asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).flatMap { highSeqNr ⇒ + val toSeqNr = math.min(toSequenceNr, highSeqNr) + if (highSeqNr == 0L || fromSequenceNr > toSeqNr) + Future.successful(highSeqNr) + else { + // Send replayed messages and replay result to persistentActor directly. No need + // to resequence replayed messages relative to written and looped messages. + asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p ⇒ + if (!p.deleted) // old records from 2.3 may still have the deleted flag + adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒ + persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender) + } + }.map(_ ⇒ highSeqNr) + } + }.map { + highSeqNr ⇒ ReplayMessagesSuccess(highSeqNr) + }.recover { case e ⇒ ReplayMessagesFailure(e) - } pipeTo persistentActor onSuccess { + }.pipeTo(persistentActor).onSuccess { case _ if publish ⇒ context.system.eventStream.publish(r) } - case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor) ⇒ - // Send read highest sequence number to persistentActor directly. No need - // to resequence the result relative to written and looped messages. - asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).map { - highest ⇒ ReadHighestSequenceNrSuccess(highest) - } recover { - case e ⇒ ReadHighestSequenceNrFailure(e) - } pipeTo persistentActor - case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) ⇒ asyncDeleteMessagesTo(persistenceId, toSequenceNr) onComplete { case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala index dba13ebe3e..3241d92f9d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala @@ -24,6 +24,7 @@ import scala.language.postfixOps private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash with ActorLogging { import AsyncWriteProxy._ import AsyncWriteTarget._ + import context.dispatcher private var isInitialized = false private var store: ActorRef = _ @@ -54,7 +55,9 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash } def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = - (store ? ReadHighestSequenceNr(persistenceId, fromSequenceNr)).mapTo[Long] + (store ? ReplayMessages(persistenceId, fromSequenceNr = 0L, toSequenceNr = 0L, max = 0L)).map { + case ReplaySuccess(highest) ⇒ highest + } } /** @@ -78,13 +81,11 @@ private[persistence] object AsyncWriteTarget { final case class ReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) @SerialVersionUID(1L) - case object ReplaySuccess + case class ReplaySuccess(highestSequenceNr: Long) @SerialVersionUID(1L) final case class ReplayFailure(cause: Throwable) - @SerialVersionUID(1L) - final case class ReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long) } /** @@ -100,7 +101,7 @@ private class ReplayMediator(replayCallback: PersistentRepr ⇒ Unit, replayComp def receive = { case p: PersistentRepr ⇒ replayCallback(p) - case ReplaySuccess ⇒ + case _: ReplaySuccess ⇒ replayCompletionPromise.success(()) context.stop(self) case ReplayFailure(cause) ⇒ diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 7418d55a95..6379b38586 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -85,9 +85,9 @@ private[persistence] class InmemStore extends Actor with InmemMessages with Writ case DeleteMessagesTo(pid, tsnr) ⇒ sender() ! (1L to tsnr foreach { snr ⇒ delete(pid, snr) }) case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ - read(pid, fromSnr, toSnr, max).foreach { sender() ! _ } - sender() ! ReplaySuccess - case ReadHighestSequenceNr(persistenceId, _) ⇒ - sender() ! highestSequenceNr(persistenceId) + val highest = highestSequenceNr(pid) + if (highest != 0L && max != 0L) + read(pid, fromSnr, math.min(toSnr, highest), max).foreach { sender() ! _ } + sender() ! ReplaySuccess(highest) } } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala index 34170e8651..360c6cfb70 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala @@ -23,6 +23,8 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le def receive = { case WriteMessages(messages) ⇒ + // TODO it would be nice to DRY this with AsyncWriteJournal, but this is using + // AsyncWriteProxy message protocol val prepared = Try(preparePersistentBatch(messages)) val writeResult = (prepared match { case Success(prep) ⇒ @@ -43,13 +45,24 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le case DeleteMessagesTo(pid, tsnr) ⇒ asyncDeleteMessagesTo(pid, tsnr).pipeTo(sender()) - case ReadHighestSequenceNr(pid, fromSequenceNr) ⇒ - asyncReadHighestSequenceNr(pid, fromSequenceNr).pipeTo(sender()) - - case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ - Try(replayMessages(numericId(pid), fromSnr, toSnr, max)(p ⇒ adaptFromJournal(p).foreach { sender() ! _ })) match { - case Success(max) ⇒ sender() ! ReplaySuccess - case Failure(cause) ⇒ sender() ! ReplayFailure(cause) - } + case ReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) ⇒ + // TODO it would be nice to DRY this with AsyncWriteJournal, but this is using + // AsyncWriteProxy message protocol + val replyTo = sender() + asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).flatMap { highSeqNr ⇒ + if (highSeqNr == 0L || max == 0L) + Future.successful(highSeqNr) + else { + val toSeqNr = math.min(toSequenceNr, highSeqNr) + asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p ⇒ + if (!p.deleted) // old records from 2.3 may still have the deleted flag + adaptFromJournal(p).foreach(replyTo ! _) + }.map(_ ⇒ highSeqNr) + } + }.map { + highSeqNr ⇒ ReplaySuccess(highSeqNr) + }.recover { + case e ⇒ ReplayFailure(e) + }.pipeTo(replyTo) } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index f377201f55..06cee55e6f 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -44,14 +44,15 @@ object PersistentActorFailureSpec { case w: WriteMessages if checkSerializable(w).exists(_.isFailure) ⇒ sender() ! checkSerializable(w) case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ + val highest = highestSequenceNr(pid) val readFromStore = read(pid, fromSnr, toSnr, max) if (readFromStore.length == 0) - sender() ! ReplaySuccess + sender() ! ReplaySuccess(highest) else if (isCorrupt(readFromStore)) sender() ! ReplayFailure(new SimulatedException(s"blahonga $fromSnr $toSnr")) else { readFromStore.foreach(sender() ! _) - sender() ! ReplaySuccess + sender() ! ReplaySuccess(highest) } }