From af3bc52480fddeac3dcbe6080eb59eb9a7595938 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 28 Aug 2015 16:05:59 +0200 Subject: [PATCH] =per #18296 Use 0L or snapshot seqNr as asyncReadHighestSequenceNr param For example, a new persistent actor (no snapshots, no events) should use 0L so that it makes sense that journal returns 0L and the first persisted event gets 1L. --- .../main/scala/akka/persistence/journal/AsyncRecovery.scala | 4 +++- .../scala/akka/persistence/journal/AsyncWriteJournal.scala | 3 ++- .../akka/persistence/journal/leveldb/LeveldbJournal.scala | 3 ++- .../akka/persistence/journal/leveldb/SharedLeveldbStore.scala | 3 ++- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala index c880045a71..18f615683e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala @@ -56,7 +56,9 @@ trait AsyncRecovery { * * @param persistenceId persistent actor id. * @param fromSequenceNr hint where to start searching for the highest sequence - * number. + * number. When a persistent actor is recovering this + * `fromSequenceNr` will be the sequence number of the used + * snapshot or `0L` if no snapshot is used. */ def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] //#journal-plugin-api diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 3d4acbfd85..fc1ae4e0b8 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -124,7 +124,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { replayFilterWindowSize, replayFilterMaxOldWriters)) else persistentActor - breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, fromSequenceNr)) + val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1) + breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, readHighestSequenceNrFrom)) .flatMap { highSeqNr ⇒ val toSeqNr = math.min(toSequenceNr, highSeqNr) if (highSeqNr == 0L || fromSequenceNr > toSeqNr) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index 696fa088de..84435473ec 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -27,7 +27,8 @@ private[persistence] class LeveldbJournal extends { val configPath = "akka.persi override def receivePluginInternal: Receive = { case r @ ReplayTaggedMessages(fromSequenceNr, toSequenceNr, max, tag, replyTo) ⇒ import context.dispatcher - asyncReadHighestSequenceNr(tagAsPersistenceId(tag), fromSequenceNr) + val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1) + asyncReadHighestSequenceNr(tagAsPersistenceId(tag), readHighestSequenceNrFrom) .flatMap { highSeqNr ⇒ val toSeqNr = math.min(toSequenceNr, highSeqNr) if (highSeqNr == 0L || fromSequenceNr > toSeqNr) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala index 4b9105457f..ba8fc72cb3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala @@ -50,7 +50,8 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le // TODO it would be nice to DRY this with AsyncWriteJournal, but this is using // AsyncWriteProxy message protocol val replyTo = sender() - asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).flatMap { highSeqNr ⇒ + val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1) + asyncReadHighestSequenceNr(persistenceId, readHighestSequenceNrFrom).flatMap { highSeqNr ⇒ if (highSeqNr == 0L || max == 0L) Future.successful(highSeqNr) else {