diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index f634a29436..24f698eeaf 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -375,11 +375,17 @@ completes. Note that a flow can be materialized multiple times, so the function producing the ``OutputStream`` must be able to handle multiple invocations. +The ``OutputStream`` will be closed when the stream that flows into the ``Sink`` is completed, and the ``Sink`` +will cancel its inflow when the ``OutputStream`` is no longer writable. + asInputStream ^^^^^^^^^^^^^ Create a sink which materializes into an ``InputStream`` that can be read to trigger demand through the sink. Bytes emitted through the stream will be available for reading through the ``InputStream`` +The ``InputStream`` will be ended when the stream flowing into this ``Sink`` completes, and the closing the +``InputStream`` will cancel the inflow of this ``Sink``. + fromInputStream ^^^^^^^^^^^^^^^ Create a source that wraps an ``InputStream``. Takes a function that produces an ``InputStream``, when the source is @@ -391,12 +397,16 @@ completes. Note that a flow can be materialized multiple times, so the function producing the ``InputStream`` must be able to handle multiple invocations. +The ``InputStream`` will be closed when the ``Source`` is canceled from its downstream, and reaching the end of the +``InputStream`` will complete the ``Source``. + asOutputStream ^^^^^^^^^^^^^^ Create a source that materializes into an ``OutputStream``. When bytes are written to the ``OutputStream`` they are emitted from the source - +The ``OutputStream`` will no longer be writable when the ``Source`` has been canceled from its downstream, and +closing the ``OutputStream`` will complete the ``Source``. File IO Sinks and Sources ------------------------- diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index b74e092fdf..38693731ec 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -364,11 +364,17 @@ completes. Note that a flow can be materialized multiple times, so the function producing the ``OutputStream`` must be able to handle multiple invocations. +The ``OutputStream`` will be closed when the stream that flows into the ``Sink`` is completed, and the ``Sink`` +will cancel its inflow when the ``OutputStream`` is no longer writable. + asInputStream ^^^^^^^^^^^^^ Create a sink which materializes into an ``InputStream`` that can be read to trigger demand through the sink. Bytes emitted through the stream will be available for reading through the ``InputStream`` +The ``InputStream`` will be ended when the stream flowing into this ``Sink`` completes, and the closing the +``InputStream`` will cancel the inflow of this ``Sink``. + fromInputStream ^^^^^^^^^^^^^^^ Create a source that wraps an ``InputStream``. Takes a function that produces an ``InputStream``, when the source is @@ -380,12 +386,16 @@ completes. Note that a flow can be materialized multiple times, so the function producing the ``InputStream`` must be able to handle multiple invocations. +The ``InputStream`` will be closed when the ``Source`` is canceled from its downstream, and reaching the end of the +``InputStream`` will complete the ``Source``. + asOutputStream ^^^^^^^^^^^^^^ Create a source that materializes into an ``OutputStream``. When bytes are written to the ``OutputStream`` they are emitted from the source - +The ``OutputStream`` will no longer be writable when the ``Source`` has been canceled from its downstream, and +closing the ``OutputStream`` will complete the ``Source``. File IO Sinks and Sources ------------------------- diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala index 18e7c0a76e..fa9009636a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -26,6 +26,9 @@ object StreamConverters { * * This method uses no auto flush for the [[java.io.OutputStream]] @see [[#fromOutputStream(function.Creator, Boolean)]] if you want to override it. * + * The [[OutputStream]] will be closed when the stream flowing into this [[Sink]] is completed. The [[Sink]] + * will cancel the stream when the [[OutputStream]] is no longer writable. + * * @param f A Creator which creates an OutputStream to write to */ def fromOutputStream(f: function.Creator[OutputStream]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = fromOutputStream(f, autoFlush = false) @@ -39,6 +42,9 @@ object StreamConverters { * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * + * The [[OutputStream]] will be closed when the stream flowing into this [[Sink]] is completed. The [[Sink]] + * will cancel the stream when the [[OutputStream]] is no longer writable. + * * @param f A Creator which creates an OutputStream to write to * @param autoFlush If true the OutputStream will be flushed whenever a byte array is written */ @@ -56,6 +62,9 @@ object StreamConverters { * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. + * + * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and + * closing the [[InputStream]] will cancel this [[Sink]]. */ def asInputStream(): Sink[ByteString, InputStream] = new Sink(scaladsl.StreamConverters.asInputStream()) @@ -68,6 +77,9 @@ object StreamConverters { * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * + * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and + * closing the [[InputStream]] will cancel this [[Sink]]. + * * @param readTimeout the max time the read operation on the materialized InputStream should block */ def asInputStream(readTimeout: FiniteDuration): Sink[ByteString, InputStream] = @@ -82,6 +94,8 @@ object StreamConverters { * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[CompletionStage]] containing the number of bytes read from the source file upon completion. + * + * The created [[InputStream]] will be closed when the [[Source]] is cancelled. */ def fromInputStream(in: function.Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, CompletionStage[IOResult]] = new Source(scaladsl.StreamConverters.fromInputStream(() ⇒ in.create(), chunkSize).toCompletionStage()) @@ -96,6 +110,8 @@ object StreamConverters { * * It materializes a [[CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, * and a possible exception if IO operation was not completed successfully. + * + * The created [[InputStream]] will be closed when the [[Source]] is cancelled. */ def fromInputStream(in: function.Creator[InputStream]): javadsl.Source[ByteString, CompletionStage[IOResult]] = fromInputStream(in, 8192) @@ -108,6 +124,9 @@ object StreamConverters { * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * + * The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]] + * will complete this [[Source]]. + * * @param writeTimeout the max time the write operation on the materialized OutputStream should block */ def asOutputStream(writeTimeout: FiniteDuration): javadsl.Source[ByteString, OutputStream] = @@ -122,6 +141,9 @@ object StreamConverters { * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. + * + * The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]] + * will complete this [[Source]]. */ def asOutputStream(): javadsl.Source[ByteString, OutputStream] = new Source(scaladsl.StreamConverters.asOutputStream()) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index 95d82441f3..39bb32f240 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -33,6 +33,8 @@ object StreamConverters { * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, * and a possible exception if IO operation was not completed successfully. * + * The created [[InputStream]] will be closed when the [[Source]] is cancelled. + * * @param in a function which creates the InputStream to read from * @param chunkSize the size of each read operation, defaults to 8192 */ @@ -48,6 +50,9 @@ object StreamConverters { * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * + * The created [[OutputStream]] will be closed when the [[Source]] is cancelled, and closing the [[OutputStream]] + * will complete this [[Source]]. + * * @param writeTimeout the max time the write operation on the materialized OutputStream should block, defaults to 5 seconds */ def asOutputStream(writeTimeout: FiniteDuration = 5.seconds): Source[ByteString, OutputStream] = @@ -62,6 +67,9 @@ object StreamConverters { * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * If `autoFlush` is true the OutputStream will be flushed whenever a byte array is written, defaults to false. + * + * The [[OutputStream]] will be closed when the stream flowing into this [[Sink]] is completed. The [[Sink]] + * will cancel the stream when the [[OutputStream]] is no longer writable. */ def fromOutputStream(out: () ⇒ OutputStream, autoFlush: Boolean = false): Sink[ByteString, Future[IOResult]] = new Sink(new OutputStreamSink(out, DefaultAttributes.outputStreamSink, sinkShape("OutputStreamSink"), autoFlush)) @@ -75,6 +83,9 @@ object StreamConverters { * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * + * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and + * closing the [[InputStream]] will cancel this [[Sink]]. + * * @param readTimeout the max time the read operation on the materialized InputStream should block */ def asInputStream(readTimeout: FiniteDuration = 5.seconds): Sink[ByteString, InputStream] =