diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index 3007330cd0..83277829e9 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -173,38 +173,6 @@ private[http] object StreamUtils { def mapEntityError(f: Throwable ⇒ Throwable): RequestEntity ⇒ RequestEntity = _.transformDataBytes(mapErrorTransformer(f)) - /** - * Simple blocking Source backed by an InputStream. - * - * FIXME: should be provided by akka-stream, see #15588 - */ - def fromInputStreamSource(inputStream: InputStream, - fileIODispatcher: String, - defaultChunkSize: Int = 65536): Source[ByteString, Unit] = { - val onlyOnceFlag = new AtomicBoolean(false) - - val iterator = new Iterator[ByteString] { - var finished = false - if (onlyOnceFlag.get() || !onlyOnceFlag.compareAndSet(false, true)) - throw new IllegalStateException("One time source can only be instantiated once") - - def hasNext: Boolean = !finished - - def next(): ByteString = - if (!finished) { - val buffer = new Array[Byte](defaultChunkSize) - val read = inputStream.read(buffer) - if (read < 0) { - finished = true - inputStream.close() - ByteString.empty - } else ByteString.fromArray(buffer, 0, read) - } else ByteString.empty - } - - Source(() ⇒ iterator).withAttributes(ActorAttributes.dispatcher(fileIODispatcher)) - } - /** * Returns a source that can only be used once for testing purposes. */ diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala index 3405295164..0420f9050d 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala @@ -5,8 +5,11 @@ package akka.http.scaladsl.server package directives -import java.io.{ File, FileInputStream } -import java.net.URL +import java.io.File +import java.net.{ URI, URL } + +import akka.stream.ActorAttributes +import akka.stream.io.{ InputStreamSource, SynchronousFileSource } import scala.annotation.tailrec import akka.actor.ActorSystem @@ -51,7 +54,8 @@ trait FileAndResourceDirectives { extractSettings { settings ⇒ complete { HttpEntity.Default(contentType, file.length, - StreamUtils.fromInputStreamSource(new FileInputStream(file), settings.fileIODispatcher)) + SynchronousFileSource(file) + .withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher))) } } } @@ -88,7 +92,8 @@ trait FileAndResourceDirectives { extractSettings { settings ⇒ complete { HttpEntity.Default(contentType, length, - StreamUtils.fromInputStreamSource(url.openStream(), settings.fileIODispatcher)) + InputStreamSource(() ⇒ url.openStream()) + .withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher))) } } }