diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala index 2e72f62672..db504616dc 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdPublisher.scala @@ -181,7 +181,7 @@ private[akka] class CurrentEventsByPersistenceIdPublisher( deliverBuf() if (highestSeqNr < toSequenceNr) toSeqNr = highestSeqNr - if (highestSeqNr == 0L || (buf.isEmpty && currSeqNo > toSequenceNr)) + if (buf.isEmpty && (currSeqNo > toSequenceNr || currSeqNo == fromSequenceNr)) onCompleteThenStop() else self ! Continue // more to fetch diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala index acf314d3a1..22518fbbb4 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagPublisher.scala @@ -183,7 +183,7 @@ private[akka] class CurrentEventsByTagPublisher( deliverBuf() if (highestSeqNr < toOffset) _toOffset = highestSeqNr - if (highestSeqNr == 0L || (buf.isEmpty && currOffset > toOffset)) + if (buf.isEmpty && (currOffset > toOffset || currOffset == fromOffset)) onCompleteThenStop() else self ! Continue // more to fetch diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala index c842d37c30..f2a45ba64a 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala @@ -33,7 +33,7 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier) def setup(persistenceId: String): ActorRef = { - val ref = system.actorOf(TestActor.props(persistenceId)) + val ref = setupEmpty(persistenceId) ref ! s"$persistenceId-1" ref ! s"$persistenceId-2" ref ! s"$persistenceId-3" @@ -43,6 +43,10 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi ref } + def setupEmpty(persistenceId: String): ActorRef = { + system.actorOf(TestActor.props(persistenceId)) + } + "Leveldb query EventsByPersistenceId" must { "implement standard EventsByTagQuery" in { @@ -88,6 +92,65 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi .expectNext("f-3") .expectComplete() // f-4 not seen } + + "return empty stream for cleaned journal from 0 to MaxLong" in { + val ref = setup("g1") + + ref ! TestActor.DeleteCmd(3L) + expectMsg(s"${3L}-deleted") + + val src = queries.currentEventsByPersistenceId("g1", 0L, Long.MaxValue) + src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete() + } + + "return empty stream for cleaned journal from 0 to 0" in { + val ref = setup("g2") + + ref ! TestActor.DeleteCmd(3L) + expectMsg(s"${3L}-deleted") + + val src = queries.currentEventsByPersistenceId("g2", 0L, 0L) + src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete() + } + + "return remaining values after partial journal cleanup" in { + val ref = setup("h") + + ref ! TestActor.DeleteCmd(2L) + expectMsg(s"${2L}-deleted") + + val src = queries.currentEventsByPersistenceId("h", 0L, Long.MaxValue) + src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectNext("h-3") expectComplete() + } + + "return empty stream for empty journal" in { + val ref = setupEmpty("i") + + val src = queries.currentEventsByPersistenceId("i", 0L, Long.MaxValue) + src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete() + } + + "return empty stream for journal from 0 to 0" in { + val ref = setup("k1") + + val src = queries.currentEventsByPersistenceId("k1", 0L, 0L) + src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete() + } + + "return empty stream for empty journal from 0 to 0" in { + val ref = setupEmpty("k2") + + val src = queries.currentEventsByPersistenceId("k2", 0L, 0L) + src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete() + } + + "return empty stream for journal from seqNo greater than highestSeqNo" in { + val ref = setup("l") + + val src = queries.currentEventsByPersistenceId("l", 4L, 3L) + src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete() + } + } "Leveldb live query EventsByPersistenceId" must { diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala index f27e2f553c..5eec6123aa 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala @@ -9,15 +9,23 @@ import akka.actor.Props object TestActor { def props(persistenceId: String): Props = Props(new TestActor(persistenceId)) + + case class DeleteCmd(toSeqNr: Long = Long.MaxValue) } class TestActor(override val persistenceId: String) extends PersistentActor { + import TestActor.DeleteCmd + val receiveRecover: Receive = { case evt: String ⇒ } val receiveCommand: Receive = { + case DeleteCmd(toSeqNr) ⇒ + deleteMessages(toSeqNr) + sender() ! s"$toSeqNr-deleted" + case cmd: String ⇒ persist(cmd) { evt ⇒ sender() ! evt + "-done"