From e27a7d92a9b95f6e4a936ce364ac8c9cdab71963 Mon Sep 17 00:00:00 2001 From: Matthias Sperl Date: Sat, 23 Jan 2016 14:28:16 +0100 Subject: [PATCH] +str #19589 Add autoFlush option to OutputStream Sinks --- .../scala/akka/stream/impl/io/IOSinks.scala | 8 ++++---- .../impl/io/OutputStreamSubscriber.scala | 7 ++++--- .../stream/javadsl/StreamConverters.scala | 20 +++++++++++++++++-- .../stream/scaladsl/StreamConverters.scala | 5 +++-- 4 files changed, 29 insertions(+), 11 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index 6ed6012765..4adb8dbd01 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -45,7 +45,7 @@ private[akka] final class FileSink(f: File, append: Boolean, val attributes: Att * Creates simple synchronous (Java 6 compatible) Sink which writes all incoming elements to the given file * (creating it before hand if necessary). */ -private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, val attributes: Attributes, shape: SinkShape[ByteString]) +private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, val attributes: Attributes, shape: SinkShape[ByteString], autoFlush: Boolean) extends SinkModule[ByteString, Future[IOResult]](shape) { override def create(context: MaterializationContext) = { @@ -55,16 +55,16 @@ private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, va val os = createOutput() // if it fails, we fail the materialization - val props = OutputStreamSubscriber.props(os, ioResultPromise, settings.maxInputBufferSize) + val props = OutputStreamSubscriber.props(os, ioResultPromise, settings.maxInputBufferSize, autoFlush) val ref = materializer.actorOf(context, props) (akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future) } override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[IOResult]] = - new OutputStreamSink(createOutput, attributes, shape) + new OutputStreamSink(createOutput, attributes, shape, autoFlush) override def withAttributes(attr: Attributes): Module = - new OutputStreamSink(createOutput, attr, amendShape(attr)) + new OutputStreamSink(createOutput, attr, amendShape(attr), autoFlush) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala index 87d0b9d057..0aa30ee176 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala @@ -16,15 +16,15 @@ import scala.util.{ Failure, Success } /** INTERNAL API */ private[akka] object OutputStreamSubscriber { - def props(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int) = { + def props(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int, autoFlush: Boolean) = { require(bufSize > 0, "buffer size must be > 0") - Props(classOf[OutputStreamSubscriber], os, completionPromise, bufSize).withDeploy(Deploy.local) + Props(classOf[OutputStreamSubscriber], os, completionPromise, bufSize, autoFlush).withDeploy(Deploy.local) } } /** INTERNAL API */ -private[akka] class OutputStreamSubscriber(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int) +private[akka] class OutputStreamSubscriber(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int, autoFlush: Boolean) extends akka.stream.actor.ActorSubscriber with ActorLogging { @@ -38,6 +38,7 @@ private[akka] class OutputStreamSubscriber(os: OutputStream, completionPromise: // blocking write os.write(bytes.toArray) bytesWritten += bytes.length + if (autoFlush) os.flush() } catch { case ex: Exception ⇒ completionPromise.success(IOResult(bytesWritten, Failure(ex))) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala index b890b980b3..f8b4ff374a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -24,10 +24,26 @@ object StreamConverters { * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * + * This method uses no auto flush for the [[java.io.OutputStream]] @see [[#fromOutputStream(function.Creator, Boolean)]] if you want to override it. + * * @param f A Creator which creates an OutputStream to write to */ - def fromOutputStream(f: function.Creator[OutputStream]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = - new Sink(scaladsl.StreamConverters.fromOutputStream(() ⇒ f.create()).toCompletionStage()) + def fromOutputStream(f: function.Creator[OutputStream]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = fromOutputStream(f, autoFlush = false) + + /** + * Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function. + * + * Materializes a [[CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, + * and a possible exception if IO operation was not completed successfully. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param f A Creator which creates an OutputStream to write to + * @param autoFlush If true the OutputStream will be flushed whenever a byte array is written + */ + def fromOutputStream(f: function.Creator[OutputStream], autoFlush: Boolean): javadsl.Sink[ByteString, CompletionStage[IOResult]] = + new Sink(scaladsl.StreamConverters.fromOutputStream(() ⇒ f.create(), autoFlush).toCompletionStage()) /** * Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index db34ba79a4..a3ab7af2e7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -61,9 +61,10 @@ object StreamConverters { * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. + * If `autoFlush` is true the OutputStream will be flushed whenever a byte array is written, defaults to false. */ - def fromOutputStream(out: () ⇒ OutputStream): Sink[ByteString, Future[IOResult]] = - new Sink(new OutputStreamSink(out, DefaultAttributes.outputStreamSink, sinkShape("OutputStreamSink"))) + def fromOutputStream(out: () ⇒ OutputStream, autoFlush: Boolean = false): Sink[ByteString, Future[IOResult]] = + new Sink(new OutputStreamSink(out, DefaultAttributes.outputStreamSink, sinkShape("OutputStreamSink"), autoFlush)) /** * Creates a Sink which when materialized will return an [[InputStream]] which it is possible