Keeps live stream open for empty persistence ids #28428 (#28429)

This commit is contained in:
Nicolas Vollmar 2020-01-07 04:18:44 -08:00 committed by Johan Andrén
parent efa856bc17
commit 795bf1c3bf
3 changed files with 24 additions and 2 deletions

View file

@ -127,7 +127,7 @@ final private[akka] class EventsByPersistenceIdStage(
nextSequenceNr,
toSequenceNr,
bufferSize)
if (bufferEmpty && (nextSequenceNr > toSequenceNr || nextSequenceNr == fromSequenceNr)) {
if (bufferEmpty && (nextSequenceNr > toSequenceNr || (nextSequenceNr == fromSequenceNr && isCurrentQuery()))) {
completeStage()
} else if (nextSequenceNr < toSequenceNr) {
// need further requests to the journal

View file

@ -205,6 +205,19 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi
probe.expectNext().timestamp should be > 0L
probe.cancel()
}
}
"not complete for empty persistence id" in {
val src = queries.eventsByPersistenceId("o", 0L, Long.MaxValue)
val probe =
src.map(_.event).runWith(TestSink.probe[Any]).request(2)
probe.expectNoMessage(200.millis) // must not complete
val ref = setupEmpty("o")
ref ! "o-1"
expectMsg(s"o-1-done")
probe.cancel()
}
}
}

View file

@ -239,6 +239,15 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with
probe.cancel()
}
"not complete for empty stream" in {
val src = queries.eventsByTag(tag = "red", offset = NoOffset)
val probe =
src.map(_.event).runWith(TestSink.probe[Any]).request(2)
probe.expectNoMessage(200.millis)
probe.cancel()
}
}
}