diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala index b71d160ce6..3018924e3d 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala @@ -127,7 +127,7 @@ final private[akka] class EventsByPersistenceIdStage( nextSequenceNr, toSequenceNr, bufferSize) - if (bufferEmpty && (nextSequenceNr > toSequenceNr || nextSequenceNr == fromSequenceNr)) { + if (bufferEmpty && (nextSequenceNr > toSequenceNr || (nextSequenceNr == fromSequenceNr && isCurrentQuery()))) { completeStage() } else if (nextSequenceNr < toSequenceNr) { // need further requests to the journal diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala index 50bc98fb61..70db3ba583 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala @@ -205,6 +205,19 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi probe.expectNext().timestamp should be > 0L probe.cancel() } - } + "not complete for empty persistence id" in { + val src = queries.eventsByPersistenceId("o", 0L, Long.MaxValue) + val probe = + src.map(_.event).runWith(TestSink.probe[Any]).request(2) + + probe.expectNoMessage(200.millis) // must not complete + + val ref = setupEmpty("o") + ref ! "o-1" + expectMsg(s"o-1-done") + + probe.cancel() + } + } } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala index db39ee66c4..b2c5170c2f 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -239,6 +239,15 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with probe.cancel() } + "not complete for empty stream" in { + val src = queries.eventsByTag(tag = "red", offset = NoOffset) + val probe = + src.map(_.event).runWith(TestSink.probe[Any]).request(2) + + probe.expectNoMessage(200.millis) + + probe.cancel() + } } }