Merge pull request #17900 from 2beaucoup/fileandresource-streams
Use Sources from akka.stream.io
This commit is contained in:
commit
6d7c60eb44
2 changed files with 9 additions and 36 deletions
|
|
@ -173,38 +173,6 @@ private[http] object StreamUtils {
|
|||
def mapEntityError(f: Throwable ⇒ Throwable): RequestEntity ⇒ RequestEntity =
|
||||
_.transformDataBytes(mapErrorTransformer(f))
|
||||
|
||||
/**
|
||||
* Simple blocking Source backed by an InputStream.
|
||||
*
|
||||
* FIXME: should be provided by akka-stream, see #15588
|
||||
*/
|
||||
def fromInputStreamSource(inputStream: InputStream,
|
||||
fileIODispatcher: String,
|
||||
defaultChunkSize: Int = 65536): Source[ByteString, Unit] = {
|
||||
val onlyOnceFlag = new AtomicBoolean(false)
|
||||
|
||||
val iterator = new Iterator[ByteString] {
|
||||
var finished = false
|
||||
if (onlyOnceFlag.get() || !onlyOnceFlag.compareAndSet(false, true))
|
||||
throw new IllegalStateException("One time source can only be instantiated once")
|
||||
|
||||
def hasNext: Boolean = !finished
|
||||
|
||||
def next(): ByteString =
|
||||
if (!finished) {
|
||||
val buffer = new Array[Byte](defaultChunkSize)
|
||||
val read = inputStream.read(buffer)
|
||||
if (read < 0) {
|
||||
finished = true
|
||||
inputStream.close()
|
||||
ByteString.empty
|
||||
} else ByteString.fromArray(buffer, 0, read)
|
||||
} else ByteString.empty
|
||||
}
|
||||
|
||||
Source(() ⇒ iterator).withAttributes(ActorAttributes.dispatcher(fileIODispatcher))
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a source that can only be used once for testing purposes.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -5,8 +5,11 @@
|
|||
package akka.http.scaladsl.server
|
||||
package directives
|
||||
|
||||
import java.io.{ File, FileInputStream }
|
||||
import java.net.URL
|
||||
import java.io.File
|
||||
import java.net.{ URI, URL }
|
||||
|
||||
import akka.stream.ActorAttributes
|
||||
import akka.stream.io.{ InputStreamSource, SynchronousFileSource }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import akka.actor.ActorSystem
|
||||
|
|
@ -51,7 +54,8 @@ trait FileAndResourceDirectives {
|
|||
extractSettings { settings ⇒
|
||||
complete {
|
||||
HttpEntity.Default(contentType, file.length,
|
||||
StreamUtils.fromInputStreamSource(new FileInputStream(file), settings.fileIODispatcher))
|
||||
SynchronousFileSource(file)
|
||||
.withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -88,7 +92,8 @@ trait FileAndResourceDirectives {
|
|||
extractSettings { settings ⇒
|
||||
complete {
|
||||
HttpEntity.Default(contentType, length,
|
||||
StreamUtils.fromInputStreamSource(url.openStream(), settings.fileIODispatcher))
|
||||
InputStreamSource(() ⇒ url.openStream())
|
||||
.withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue