document that StreamConverters close their streams #19773

fixes #19773
This commit is contained in:
Roland Kuhn 2016-02-16 11:10:54 +01:00
parent 5a5f52c047
commit a25a0f0aa8
4 changed files with 55 additions and 2 deletions

View file

@ -375,11 +375,17 @@ completes.
Note that a flow can be materialized multiple times, so the function producing the ``OutputStream`` must be able Note that a flow can be materialized multiple times, so the function producing the ``OutputStream`` must be able
to handle multiple invocations. to handle multiple invocations.
The ``OutputStream`` will be closed when the stream that flows into the ``Sink`` is completed, and the ``Sink``
will cancel its inflow when the ``OutputStream`` is no longer writable.
asInputStream asInputStream
^^^^^^^^^^^^^ ^^^^^^^^^^^^^
Create a sink which materializes into an ``InputStream`` that can be read to trigger demand through the sink. Create a sink which materializes into an ``InputStream`` that can be read to trigger demand through the sink.
Bytes emitted through the stream will be available for reading through the ``InputStream`` Bytes emitted through the stream will be available for reading through the ``InputStream``
The ``InputStream`` will be ended when the stream flowing into this ``Sink`` completes, and the closing the
``InputStream`` will cancel the inflow of this ``Sink``.
fromInputStream fromInputStream
^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^
Create a source that wraps an ``InputStream``. Takes a function that produces an ``InputStream``, when the source is Create a source that wraps an ``InputStream``. Takes a function that produces an ``InputStream``, when the source is
@ -391,12 +397,16 @@ completes.
Note that a flow can be materialized multiple times, so the function producing the ``InputStream`` must be able Note that a flow can be materialized multiple times, so the function producing the ``InputStream`` must be able
to handle multiple invocations. to handle multiple invocations.
The ``InputStream`` will be closed when the ``Source`` is canceled from its downstream, and reaching the end of the
``InputStream`` will complete the ``Source``.
asOutputStream asOutputStream
^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^
Create a source that materializes into an ``OutputStream``. When bytes are written to the ``OutputStream`` they Create a source that materializes into an ``OutputStream``. When bytes are written to the ``OutputStream`` they
are emitted from the source are emitted from the source
The ``OutputStream`` will no longer be writable when the ``Source`` has been canceled from its downstream, and
closing the ``OutputStream`` will complete the ``Source``.
File IO Sinks and Sources File IO Sinks and Sources
------------------------- -------------------------

View file

@ -364,11 +364,17 @@ completes.
Note that a flow can be materialized multiple times, so the function producing the ``OutputStream`` must be able Note that a flow can be materialized multiple times, so the function producing the ``OutputStream`` must be able
to handle multiple invocations. to handle multiple invocations.
The ``OutputStream`` will be closed when the stream that flows into the ``Sink`` is completed, and the ``Sink``
will cancel its inflow when the ``OutputStream`` is no longer writable.
asInputStream asInputStream
^^^^^^^^^^^^^ ^^^^^^^^^^^^^
Create a sink which materializes into an ``InputStream`` that can be read to trigger demand through the sink. Create a sink which materializes into an ``InputStream`` that can be read to trigger demand through the sink.
Bytes emitted through the stream will be available for reading through the ``InputStream`` Bytes emitted through the stream will be available for reading through the ``InputStream``
The ``InputStream`` will be ended when the stream flowing into this ``Sink`` completes, and the closing the
``InputStream`` will cancel the inflow of this ``Sink``.
fromInputStream fromInputStream
^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^
Create a source that wraps an ``InputStream``. Takes a function that produces an ``InputStream``, when the source is Create a source that wraps an ``InputStream``. Takes a function that produces an ``InputStream``, when the source is
@ -380,12 +386,16 @@ completes.
Note that a flow can be materialized multiple times, so the function producing the ``InputStream`` must be able Note that a flow can be materialized multiple times, so the function producing the ``InputStream`` must be able
to handle multiple invocations. to handle multiple invocations.
The ``InputStream`` will be closed when the ``Source`` is canceled from its downstream, and reaching the end of the
``InputStream`` will complete the ``Source``.
asOutputStream asOutputStream
^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^
Create a source that materializes into an ``OutputStream``. When bytes are written to the ``OutputStream`` they Create a source that materializes into an ``OutputStream``. When bytes are written to the ``OutputStream`` they
are emitted from the source are emitted from the source
The ``OutputStream`` will no longer be writable when the ``Source`` has been canceled from its downstream, and
closing the ``OutputStream`` will complete the ``Source``.
File IO Sinks and Sources File IO Sinks and Sources
------------------------- -------------------------

View file

@ -26,6 +26,9 @@ object StreamConverters {
* *
* This method uses no auto flush for the [[java.io.OutputStream]] @see [[#fromOutputStream(function.Creator, Boolean)]] if you want to override it. * This method uses no auto flush for the [[java.io.OutputStream]] @see [[#fromOutputStream(function.Creator, Boolean)]] if you want to override it.
* *
* The [[OutputStream]] will be closed when the stream flowing into this [[Sink]] is completed. The [[Sink]]
* will cancel the stream when the [[OutputStream]] is no longer writable.
*
* @param f A Creator which creates an OutputStream to write to * @param f A Creator which creates an OutputStream to write to
*/ */
def fromOutputStream(f: function.Creator[OutputStream]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = fromOutputStream(f, autoFlush = false) def fromOutputStream(f: function.Creator[OutputStream]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = fromOutputStream(f, autoFlush = false)
@ -39,6 +42,9 @@ object StreamConverters {
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]]. * set it for a given Source by using [[ActorAttributes]].
* *
* The [[OutputStream]] will be closed when the stream flowing into this [[Sink]] is completed. The [[Sink]]
* will cancel the stream when the [[OutputStream]] is no longer writable.
*
* @param f A Creator which creates an OutputStream to write to * @param f A Creator which creates an OutputStream to write to
* @param autoFlush If true the OutputStream will be flushed whenever a byte array is written * @param autoFlush If true the OutputStream will be flushed whenever a byte array is written
*/ */
@ -56,6 +62,9 @@ object StreamConverters {
* *
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]]. * set it for a given Source by using [[ActorAttributes]].
*
* The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and
* closing the [[InputStream]] will cancel this [[Sink]].
*/ */
def asInputStream(): Sink[ByteString, InputStream] = new Sink(scaladsl.StreamConverters.asInputStream()) def asInputStream(): Sink[ByteString, InputStream] = new Sink(scaladsl.StreamConverters.asInputStream())
@ -68,6 +77,9 @@ object StreamConverters {
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]]. * set it for a given Source by using [[ActorAttributes]].
* *
* The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and
* closing the [[InputStream]] will cancel this [[Sink]].
*
* @param readTimeout the max time the read operation on the materialized InputStream should block * @param readTimeout the max time the read operation on the materialized InputStream should block
*/ */
def asInputStream(readTimeout: FiniteDuration): Sink[ByteString, InputStream] = def asInputStream(readTimeout: FiniteDuration): Sink[ByteString, InputStream] =
@ -82,6 +94,8 @@ object StreamConverters {
* set it for a given Source by using [[ActorAttributes]]. * set it for a given Source by using [[ActorAttributes]].
* *
* It materializes a [[CompletionStage]] containing the number of bytes read from the source file upon completion. * It materializes a [[CompletionStage]] containing the number of bytes read from the source file upon completion.
*
* The created [[InputStream]] will be closed when the [[Source]] is cancelled.
*/ */
def fromInputStream(in: function.Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, CompletionStage[IOResult]] = def fromInputStream(in: function.Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, CompletionStage[IOResult]] =
new Source(scaladsl.StreamConverters.fromInputStream(() in.create(), chunkSize).toCompletionStage()) new Source(scaladsl.StreamConverters.fromInputStream(() in.create(), chunkSize).toCompletionStage())
@ -96,6 +110,8 @@ object StreamConverters {
* *
* It materializes a [[CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, * It materializes a [[CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
* and a possible exception if IO operation was not completed successfully. * and a possible exception if IO operation was not completed successfully.
*
* The created [[InputStream]] will be closed when the [[Source]] is cancelled.
*/ */
def fromInputStream(in: function.Creator[InputStream]): javadsl.Source[ByteString, CompletionStage[IOResult]] = fromInputStream(in, 8192) def fromInputStream(in: function.Creator[InputStream]): javadsl.Source[ByteString, CompletionStage[IOResult]] = fromInputStream(in, 8192)
@ -108,6 +124,9 @@ object StreamConverters {
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]]. * set it for a given Source by using [[ActorAttributes]].
* *
* The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]]
* will complete this [[Source]].
*
* @param writeTimeout the max time the write operation on the materialized OutputStream should block * @param writeTimeout the max time the write operation on the materialized OutputStream should block
*/ */
def asOutputStream(writeTimeout: FiniteDuration): javadsl.Source[ByteString, OutputStream] = def asOutputStream(writeTimeout: FiniteDuration): javadsl.Source[ByteString, OutputStream] =
@ -122,6 +141,9 @@ object StreamConverters {
* *
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]]. * set it for a given Source by using [[ActorAttributes]].
*
* The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]]
* will complete this [[Source]].
*/ */
def asOutputStream(): javadsl.Source[ByteString, OutputStream] = def asOutputStream(): javadsl.Source[ByteString, OutputStream] =
new Source(scaladsl.StreamConverters.asOutputStream()) new Source(scaladsl.StreamConverters.asOutputStream())

View file

@ -33,6 +33,8 @@ object StreamConverters {
* It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
* and a possible exception if IO operation was not completed successfully. * and a possible exception if IO operation was not completed successfully.
* *
* The created [[InputStream]] will be closed when the [[Source]] is cancelled.
*
* @param in a function which creates the InputStream to read from * @param in a function which creates the InputStream to read from
* @param chunkSize the size of each read operation, defaults to 8192 * @param chunkSize the size of each read operation, defaults to 8192
*/ */
@ -48,6 +50,9 @@ object StreamConverters {
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]]. * set it for a given Source by using [[ActorAttributes]].
* *
* The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]]
* will complete this [[Source]].
*
* @param writeTimeout the max time the write operation on the materialized OutputStream should block, defaults to 5 seconds * @param writeTimeout the max time the write operation on the materialized OutputStream should block, defaults to 5 seconds
*/ */
def asOutputStream(writeTimeout: FiniteDuration = 5.seconds): Source[ByteString, OutputStream] = def asOutputStream(writeTimeout: FiniteDuration = 5.seconds): Source[ByteString, OutputStream] =
@ -62,6 +67,9 @@ object StreamConverters {
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]]. * set it for a given Source by using [[ActorAttributes]].
* If `autoFlush` is true the OutputStream will be flushed whenever a byte array is written, defaults to false. * If `autoFlush` is true the OutputStream will be flushed whenever a byte array is written, defaults to false.
*
* The [[OutputStream]] will be closed when the stream flowing into this [[Sink]] is completed. The [[Sink]]
* will cancel the stream when the [[OutputStream]] is no longer writable.
*/ */
def fromOutputStream(out: () OutputStream, autoFlush: Boolean = false): Sink[ByteString, Future[IOResult]] = def fromOutputStream(out: () OutputStream, autoFlush: Boolean = false): Sink[ByteString, Future[IOResult]] =
new Sink(new OutputStreamSink(out, DefaultAttributes.outputStreamSink, sinkShape("OutputStreamSink"), autoFlush)) new Sink(new OutputStreamSink(out, DefaultAttributes.outputStreamSink, sinkShape("OutputStreamSink"), autoFlush))
@ -75,6 +83,9 @@ object StreamConverters {
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]]. * set it for a given Source by using [[ActorAttributes]].
* *
* The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and
* closing the [[InputStream]] will cancel this [[Sink]].
*
* @param readTimeout the max time the read operation on the materialized InputStream should block * @param readTimeout the max time the read operation on the materialized InputStream should block
*/ */
def asInputStream(readTimeout: FiniteDuration = 5.seconds): Sink[ByteString, InputStream] = def asInputStream(readTimeout: FiniteDuration = 5.seconds): Sink[ByteString, InputStream] =