From 795bf1c3bf7a7f8cb260810accccf3fd6bf3783a Mon Sep 17 00:00:00 2001 From: Nicolas Vollmar Date: Tue, 7 Jan 2020 04:18:44 -0800 Subject: [PATCH] Keeps live stream open for empty persistence ids #28428 (#28429) --- .../leveldb/EventsByPersistenceIdStage.scala | 2 +- .../leveldb/EventsByPersistenceIdSpec.scala | 15 ++++++++++++++- .../query/journal/leveldb/EventsByTagSpec.scala | 9 +++++++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala index b71d160ce6..3018924e3d 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala @@ -127,7 +127,7 @@ final private[akka] class EventsByPersistenceIdStage( nextSequenceNr, toSequenceNr, bufferSize) - if (bufferEmpty && (nextSequenceNr > toSequenceNr || nextSequenceNr == fromSequenceNr)) { + if (bufferEmpty && (nextSequenceNr > toSequenceNr || (nextSequenceNr == fromSequenceNr && isCurrentQuery()))) { completeStage() } else if (nextSequenceNr < toSequenceNr) { // need further requests to the journal diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala index 50bc98fb61..70db3ba583 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala @@ -205,6 +205,19 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi probe.expectNext().timestamp should be > 0L probe.cancel() } - } + "not complete for empty persistence id" in { + val src = queries.eventsByPersistenceId("o", 0L, Long.MaxValue) + val probe = + src.map(_.event).runWith(TestSink.probe[Any]).request(2) + + probe.expectNoMessage(200.millis) // must not complete + + val ref = setupEmpty("o") + ref ! "o-1" + expectMsg(s"o-1-done") + + probe.cancel() + } + } } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala index db39ee66c4..b2c5170c2f 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -239,6 +239,15 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with probe.cancel() } + "not complete for empty stream" in { + val src = queries.eventsByTag(tag = "red", offset = NoOffset) + val probe = + src.map(_.event).runWith(TestSink.probe[Any]).request(2) + + probe.expectNoMessage(200.millis) + + probe.cancel() + } } }