diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index 136c27fc1c..837ff9fad8 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -4,8 +4,6 @@ package akka.http.model -import akka.http.util.FastFuture - import language.implicitConversions import java.io.File import java.lang.{ Iterable ⇒ JIterable } @@ -265,7 +263,7 @@ object HttpEntity { sentLastChunk = true Chunk(byteTransformer.onTermination(None).join) :: l :: Nil } - + override def onError(cause: scala.Throwable): Unit = byteTransformer.onError(cause) override def onTermination(e: Option[Throwable]): immutable.Seq[ChunkStreamPart] = { val remaining = if (e.isEmpty && !sentLastChunk) byteTransformer.onTermination(None) @@ -275,6 +273,8 @@ object HttpEntity { if (remaining.nonEmpty) Chunk(remaining.join) :: Nil else Nil } + + override def cleanup(): Unit = byteTransformer.cleanup() }) HttpEntity.Chunked(contentType, newChunks) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala index 6f009bfd45..41facc07f5 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala @@ -47,7 +47,7 @@ object Sink { * A `Sink` that immediately cancels its upstream after materialization. */ def cancelled[T]: Drain[T] = CancelDrain - + private def createSinkFromBuilder[T](builder: FlowGraphBuilder, block: FlowGraphBuilder ⇒ UndefinedSource[T]): Sink[T] = { val in = block(builder) builder.partialBuild().toSink(in)