Merge pull request #19590 from m-sp/master

+str #19589 Add autoFlush option to  OutputStream Sinks
This commit is contained in:
Johan Andrén 2016-01-25 14:32:16 +01:00
commit 97df7bf689
4 changed files with 29 additions and 11 deletions

View file

@ -45,7 +45,7 @@ private[akka] final class FileSink(f: File, append: Boolean, val attributes: Att
* Creates simple synchronous (Java 6 compatible) Sink which writes all incoming elements to the given file
* (creating it before hand if necessary).
*/
private[akka] final class OutputStreamSink(createOutput: () OutputStream, val attributes: Attributes, shape: SinkShape[ByteString])
private[akka] final class OutputStreamSink(createOutput: () OutputStream, val attributes: Attributes, shape: SinkShape[ByteString], autoFlush: Boolean)
extends SinkModule[ByteString, Future[IOResult]](shape) {
override def create(context: MaterializationContext) = {
@ -55,16 +55,16 @@ private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, va
val os = createOutput() // if it fails, we fail the materialization
val props = OutputStreamSubscriber.props(os, ioResultPromise, settings.maxInputBufferSize)
val props = OutputStreamSubscriber.props(os, ioResultPromise, settings.maxInputBufferSize, autoFlush)
val ref = materializer.actorOf(context, props)
(akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future)
}
override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[IOResult]] =
new OutputStreamSink(createOutput, attributes, shape)
new OutputStreamSink(createOutput, attributes, shape, autoFlush)
override def withAttributes(attr: Attributes): Module =
new OutputStreamSink(createOutput, attr, amendShape(attr))
new OutputStreamSink(createOutput, attr, amendShape(attr), autoFlush)
}

View file

@ -16,15 +16,15 @@ import scala.util.{ Failure, Success }
/** INTERNAL API */
private[akka] object OutputStreamSubscriber {
def props(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int) = {
def props(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int, autoFlush: Boolean) = {
require(bufSize > 0, "buffer size must be > 0")
Props(classOf[OutputStreamSubscriber], os, completionPromise, bufSize).withDeploy(Deploy.local)
Props(classOf[OutputStreamSubscriber], os, completionPromise, bufSize, autoFlush).withDeploy(Deploy.local)
}
}
/** INTERNAL API */
private[akka] class OutputStreamSubscriber(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int)
private[akka] class OutputStreamSubscriber(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int, autoFlush: Boolean)
extends akka.stream.actor.ActorSubscriber
with ActorLogging {
@ -38,6 +38,7 @@ private[akka] class OutputStreamSubscriber(os: OutputStream, completionPromise:
// blocking write
os.write(bytes.toArray)
bytesWritten += bytes.length
if (autoFlush) os.flush()
} catch {
case ex: Exception
completionPromise.success(IOResult(bytesWritten, Failure(ex)))

View file

@ -24,10 +24,26 @@ object StreamConverters {
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* This method uses no auto flush for the [[java.io.OutputStream]] @see [[#fromOutputStream(function.Creator, Boolean)]] if you want to override it.
*
* @param f A Creator which creates an OutputStream to write to
*/
def fromOutputStream(f: function.Creator[OutputStream]): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
new Sink(scaladsl.StreamConverters.fromOutputStream(() f.create()).toCompletionStage())
def fromOutputStream(f: function.Creator[OutputStream]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = fromOutputStream(f, autoFlush = false)
/**
* Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function.
*
* Materializes a [[CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param f A Creator which creates an OutputStream to write to
* @param autoFlush If true the OutputStream will be flushed whenever a byte array is written
*/
def fromOutputStream(f: function.Creator[OutputStream], autoFlush: Boolean): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
new Sink(scaladsl.StreamConverters.fromOutputStream(() f.create(), autoFlush).toCompletionStage())
/**
* Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible

View file

@ -61,9 +61,10 @@ object StreamConverters {
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
* If `autoFlush` is true the OutputStream will be flushed whenever a byte array is written, defaults to false.
*/
def fromOutputStream(out: () OutputStream): Sink[ByteString, Future[IOResult]] =
new Sink(new OutputStreamSink(out, DefaultAttributes.outputStreamSink, sinkShape("OutputStreamSink")))
def fromOutputStream(out: () OutputStream, autoFlush: Boolean = false): Sink[ByteString, Future[IOResult]] =
new Sink(new OutputStreamSink(out, DefaultAttributes.outputStreamSink, sinkShape("OutputStreamSink"), autoFlush))
/**
* Creates a Sink which when materialized will return an [[InputStream]] which it is possible