Merge pull request #18355 from akka/wip-18296-readHighestSequenceNr-patriknw

=per #18296 Use 0L or snapshot seqNr as asyncReadHighestSequenceNr param
This commit is contained in:
Patrik Nordwall 2015-09-04 11:32:02 +02:00
commit 40936e8333
4 changed files with 9 additions and 4 deletions

View file

@ -56,7 +56,9 @@ trait AsyncRecovery {
*
* @param persistenceId persistent actor id.
* @param fromSequenceNr hint where to start searching for the highest sequence
* number.
* number. When a persistent actor is recovering this
* `fromSequenceNr` will be the sequence number of the used
* snapshot or `0L` if no snapshot is used.
*/
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]
//#journal-plugin-api

View file

@ -124,7 +124,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
replayFilterWindowSize, replayFilterMaxOldWriters))
else persistentActor
breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, fromSequenceNr))
val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1)
breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, readHighestSequenceNrFrom))
.flatMap { highSeqNr
val toSeqNr = math.min(toSequenceNr, highSeqNr)
if (highSeqNr == 0L || fromSequenceNr > toSeqNr)

View file

@ -27,7 +27,8 @@ private[persistence] class LeveldbJournal extends { val configPath = "akka.persi
override def receivePluginInternal: Receive = {
case r @ ReplayTaggedMessages(fromSequenceNr, toSequenceNr, max, tag, replyTo)
import context.dispatcher
asyncReadHighestSequenceNr(tagAsPersistenceId(tag), fromSequenceNr)
val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1)
asyncReadHighestSequenceNr(tagAsPersistenceId(tag), readHighestSequenceNrFrom)
.flatMap { highSeqNr
val toSeqNr = math.min(toSequenceNr, highSeqNr)
if (highSeqNr == 0L || fromSequenceNr > toSeqNr)

View file

@ -50,7 +50,8 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le
// TODO it would be nice to DRY this with AsyncWriteJournal, but this is using
// AsyncWriteProxy message protocol
val replyTo = sender()
asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).flatMap { highSeqNr
val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1)
asyncReadHighestSequenceNr(persistenceId, readHighestSequenceNrFrom).flatMap { highSeqNr
if (highSeqNr == 0L || max == 0L)
Future.successful(highSeqNr)
else {