From eede5533ace5a9598e0e5974cc78c51ba3e3eaed Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Wed, 14 Feb 2018 16:18:58 +0000 Subject: [PATCH] Chunker should only pull if buffer is empty (#24019) For the current Chunker if the upstream sends larger messages than the chunk size the buffer can slowly fill up until the app OOMs. Change it so it only pulls if the buffer is empty --- .../jdocs/stream/javadsl/cookbook/RecipeByteStrings.java | 4 +--- .../test/scala/docs/stream/cookbook/RecipeByteStrings.scala | 6 +----- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeByteStrings.java b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeByteStrings.java index d00d61418a..e3bddaa75b 100644 --- a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeByteStrings.java +++ b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeByteStrings.java @@ -82,10 +82,8 @@ public class RecipeByteStrings extends RecipeTest { setHandler(out, new AbstractOutHandler(){ @Override public void onPull() throws Exception { - if (isClosed(in)) emitChunk(); - else pull(in); + emitChunk(); } - }); setHandler(in, new AbstractInHandler() { diff --git a/akka-docs/src/test/scala/docs/stream/cookbook/RecipeByteStrings.scala b/akka-docs/src/test/scala/docs/stream/cookbook/RecipeByteStrings.scala index 8cea3af8d8..e465efbf0b 100644 --- a/akka-docs/src/test/scala/docs/stream/cookbook/RecipeByteStrings.scala +++ b/akka-docs/src/test/scala/docs/stream/cookbook/RecipeByteStrings.scala @@ -23,14 +23,12 @@ class RecipeByteStrings extends RecipeSpec { val in = Inlet[ByteString]("Chunker.in") val out = Outlet[ByteString]("Chunker.out") override val shape = FlowShape.of(in, out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private var buffer = ByteString.empty setHandler(out, new OutHandler { override def onPull(): Unit = { - if (isClosed(in)) emitChunk() - else pull(in) + emitChunk() } }) setHandler(in, new InHandler { @@ -64,7 +62,6 @@ class RecipeByteStrings extends RecipeSpec { push(out, chunk) } } - } } @@ -72,7 +69,6 @@ class RecipeByteStrings extends RecipeSpec { //#bytestring-chunker val chunksFuture = chunksStream.limit(10).runWith(Sink.seq) - val chunks = Await.result(chunksFuture, 3.seconds) chunks.forall(_.size <= 2) should be(true)