diff --git a/akka-docs/rst/java/code/docs/stream/BidiFlowDocTest.java b/akka-docs/rst/java/code/docs/stream/BidiFlowDocTest.java index 3c266a6215..27eb549e8c 100644 --- a/akka-docs/rst/java/code/docs/stream/BidiFlowDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/BidiFlowDocTest.java @@ -165,9 +165,12 @@ public class BidiFlowDocTest extends AbstractJavaTest { @Override public void onUpstreamFinish() throws Exception { + // either we are done if (stash.isEmpty()) completeStage(); + // or we still have bytes to emit // wait with completion and let run() complete when the // rest of the stash has been sent downstream + else if (isAvailable(out)) run(); } }); diff --git a/akka-docs/rst/scala/code/docs/stream/BidiFlowDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/BidiFlowDocSpec.scala index d69e1ca39b..f8bd88e7ba 100644 --- a/akka-docs/rst/scala/code/docs/stream/BidiFlowDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/BidiFlowDocSpec.scala @@ -93,9 +93,12 @@ object BidiFlowDocSpec { } override def onUpstreamFinish(): Unit = { + // either we are done if (stash.isEmpty) completeStage() + // or we still have bytes to emit // wait with completion and let run() complete when the // rest of the stash has been sent downstream + else if (isAvailable(out)) run() } })