Chunker should only pull if buffer is empty (#24019)
For the current Chunker if the upstream sends larger messages than the chunk size the buffer can slowly fill up until the app OOMs. Change it so it only pulls if the buffer is empty
This commit is contained in:
parent
d3b625616a
commit
eede5533ac
2 changed files with 2 additions and 8 deletions
|
|
@ -23,14 +23,12 @@ class RecipeByteStrings extends RecipeSpec {
|
|||
val in = Inlet[ByteString]("Chunker.in")
|
||||
val out = Outlet[ByteString]("Chunker.out")
|
||||
override val shape = FlowShape.of(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
||||
private var buffer = ByteString.empty
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
if (isClosed(in)) emitChunk()
|
||||
else pull(in)
|
||||
emitChunk()
|
||||
}
|
||||
})
|
||||
setHandler(in, new InHandler {
|
||||
|
|
@ -64,7 +62,6 @@ class RecipeByteStrings extends RecipeSpec {
|
|||
push(out, chunk)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -72,7 +69,6 @@ class RecipeByteStrings extends RecipeSpec {
|
|||
//#bytestring-chunker
|
||||
|
||||
val chunksFuture = chunksStream.limit(10).runWith(Sink.seq)
|
||||
|
||||
val chunks = Await.result(chunksFuture, 3.seconds)
|
||||
|
||||
chunks.forall(_.size <= 2) should be(true)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue