diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeByteStrings.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeByteStrings.java index 81ff223327..f9c7255473 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeByteStrings.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeByteStrings.java @@ -100,8 +100,15 @@ public class RecipeByteStrings extends RecipeTest { @Override public void onUpstreamFinish() throws Exception { if (buffer.isEmpty()) completeStage(); - // elements left in buffer, keep accepting downstream pulls - // and push from buffer until buffer is emitted + else { + // There are elements left in buffer, so + // we keep accepting downstream pulls and push from buffer until emptied. + // + // It might be though, that the upstream finished while it was pulled, in which + // case we will not get an onPull from the downstream, because we already had one. + // In that case we need to emit from the buffer. + if (isAvailable(out)) emitChunk(); + } } }); } diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala index 4c5f1d356e..8cea3af8d8 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala @@ -42,8 +42,15 @@ class RecipeByteStrings extends RecipeSpec { override def onUpstreamFinish(): Unit = { if (buffer.isEmpty) completeStage() - // elements left in buffer, keep accepting downstream pulls - // and push from buffer until buffer is emitted + else { + // There are elements left in buffer, so + // we keep accepting downstream pulls and push from buffer until emptied. + // + // It might be though, that the upstream finished while it was pulled, in which + // case we will not get an onPull from the downstream, because we already had one. + // In that case we need to emit from the buffer. + if (isAvailable(out)) emitChunk() + } } }) diff --git a/project/MiMa.scala b/project/MiMa.scala index 07112a6c53..4d61603f6d 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -903,7 +903,10 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.pubsub.protobuf.msg.DistributedPubSubMessages#StatusOrBuilder.hasReplyToStatus"), // #20543 GraphStage subtypes should not be private to akka - ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.stream.ActorMaterializer.actorOf") + ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.stream.ActorMaterializer.actorOf"), + + // Interpreter internals change + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.stage.GraphStageLogic.portToConn") ), "2.4.9" -> Seq( // #20994 adding new decode method, since we're on JDK7+ now