fix StreamRefs IllegalStateException (#29432)
* Test SinkRefStage in the CumulativeDemand-after-UpstreamFinish scenario
This commit is contained in:
parent
68fc503c4c
commit
58fa1e3604
2 changed files with 36 additions and 1 deletions
|
|
@ -433,6 +433,41 @@ class StreamRefsSpec extends AkkaSpec(StreamRefsSpec.config()) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"not die to a slow and eager subscriber" in {
|
||||||
|
import akka.stream.impl.streamref.StreamRefsProtocol._
|
||||||
|
|
||||||
|
// GIVEN: remoteActor delivers 2 elements "hello", "world"
|
||||||
|
val remoteProbe = TestProbe()(remoteSystem)
|
||||||
|
remoteActor.tell("give", remoteProbe.ref)
|
||||||
|
val sourceRefImpl = remoteProbe.expectMsgType[SourceRefImpl[String]]
|
||||||
|
|
||||||
|
val sourceRefStageProbe = TestProbe("sourceRefStageProbe")
|
||||||
|
|
||||||
|
// WHEN: SourceRefStage sends a first CumulativeDemand with enough demand to consume the whole stream
|
||||||
|
sourceRefStageProbe.send(sourceRefImpl.initialPartnerRef, CumulativeDemand(10))
|
||||||
|
|
||||||
|
// THEN: stream established with OnSubscribeHandshake
|
||||||
|
val onSubscribeHandshake = sourceRefStageProbe.expectMsgType[OnSubscribeHandshake]
|
||||||
|
val sinkRefStageActorRef = watch(onSubscribeHandshake.targetRef)
|
||||||
|
|
||||||
|
// THEN: all elements are streamed to SourceRefStage
|
||||||
|
sourceRefStageProbe.expectMsg(SequencedOnNext(0, "hello"))
|
||||||
|
sourceRefStageProbe.expectMsg(SequencedOnNext(1, "world"))
|
||||||
|
sourceRefStageProbe.expectMsg(RemoteStreamCompleted(2))
|
||||||
|
|
||||||
|
// WHEN: SinkRefStage receives another CumulativeDemand, due to latency in network or slowness of sourceRefStage
|
||||||
|
sourceRefStageProbe.send(sinkRefStageActorRef, CumulativeDemand(10))
|
||||||
|
|
||||||
|
// THEN: SinkRefStage should not terminate
|
||||||
|
expectNoMessage()
|
||||||
|
|
||||||
|
// WHEN: SourceRefStage terminates
|
||||||
|
system.stop(sourceRefStageProbe.ref)
|
||||||
|
|
||||||
|
// THEN: SinkRefStage should terminate
|
||||||
|
expectTerminated(sinkRefStageActorRef)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"A SinkRef" must {
|
"A SinkRef" must {
|
||||||
|
|
|
||||||
|
|
@ -196,7 +196,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn
|
||||||
}
|
}
|
||||||
|
|
||||||
private def tryPull(): Unit =
|
private def tryPull(): Unit =
|
||||||
if (remoteCumulativeDemandConsumed < remoteCumulativeDemandReceived && !hasBeenPulled(in)) {
|
if (remoteCumulativeDemandConsumed < remoteCumulativeDemandReceived && !hasBeenPulled(in) && !isClosed(in)) {
|
||||||
pull(in)
|
pull(in)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue