diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala index c880045a71..18f615683e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala @@ -56,7 +56,9 @@ trait AsyncRecovery { * * @param persistenceId persistent actor id. * @param fromSequenceNr hint where to start searching for the highest sequence - * number. + * number. When a persistent actor is recovering this + * `fromSequenceNr` will be the sequence number of the used + * snapshot or `0L` if no snapshot is used. */ def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] //#journal-plugin-api diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 3d4acbfd85..fc1ae4e0b8 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -124,7 +124,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { replayFilterWindowSize, replayFilterMaxOldWriters)) else persistentActor - breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, fromSequenceNr)) + val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1) + breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, readHighestSequenceNrFrom)) .flatMap { highSeqNr ⇒ val toSeqNr = math.min(toSequenceNr, highSeqNr) if (highSeqNr == 0L || fromSequenceNr > toSeqNr) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index 696fa088de..84435473ec 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -27,7 +27,8 @@ private[persistence] class LeveldbJournal extends { val configPath = "akka.persi override def receivePluginInternal: Receive = { case r @ ReplayTaggedMessages(fromSequenceNr, toSequenceNr, max, tag, replyTo) ⇒ import context.dispatcher - asyncReadHighestSequenceNr(tagAsPersistenceId(tag), fromSequenceNr) + val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1) + asyncReadHighestSequenceNr(tagAsPersistenceId(tag), readHighestSequenceNrFrom) .flatMap { highSeqNr ⇒ val toSeqNr = math.min(toSequenceNr, highSeqNr) if (highSeqNr == 0L || fromSequenceNr > toSeqNr) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala index 4b9105457f..ba8fc72cb3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala @@ -50,7 +50,8 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le // TODO it would be nice to DRY this with AsyncWriteJournal, but this is using // AsyncWriteProxy message protocol val replyTo = sender() - asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).flatMap { highSeqNr ⇒ + val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1) + asyncReadHighestSequenceNr(persistenceId, readHighestSequenceNrFrom).flatMap { highSeqNr ⇒ if (highSeqNr == 0L || max == 0L) Future.successful(highSeqNr) else {