!str - Moving the InputStream and OutputStream utilities into Source and Sink

This commit is contained in:
Viktor Klang 2015-11-17 13:17:30 +01:00
parent cb8d3c4609
commit 8780ba28a4
25 changed files with 432 additions and 310 deletions

View file

@ -52,7 +52,7 @@ class FileSourcesBenchmark {
@Setup
def setup() {
fileChannelSource = Source.file(file, bufSize)
fileInputStreamSource = InputStreamSource(() new FileInputStream(file), bufSize)
fileInputStreamSource = Source.inputStream(() new FileInputStream(file), bufSize)
ioSourceLinesIterator = Source(() scala.io.Source.fromFile(file).getLines()).map(ByteString(_))
}

View file

@ -2,6 +2,7 @@ package docs;
import akka.actor.Cancellable;
import akka.http.javadsl.model.Uri;
import akka.japi.function.Creator;
import akka.japi.Pair;
import akka.japi.function.Function;
import akka.stream.*;
@ -14,6 +15,9 @@ import scala.concurrent.Promise;
import scala.runtime.BoxedUnit;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
import java.nio.charset.Charset;
@ -22,6 +26,15 @@ public class MigrationsJava {
// This is compile-only code, no need for actually running anything.
public static ActorMaterializer mat = null;
public static class SomeInputStream extends InputStream {
public SomeInputStream() {}
@Override public int read() throws IOException { return 0; }
}
public static class SomeOutputStream extends OutputStream {
@Override public void write(int b) throws IOException { return; }
}
public static void main(String[] args) {
Outlet<Integer> outlet = null;
@ -160,6 +173,47 @@ public class MigrationsJava {
final Sink<ByteString, Future<Long>> fileSink =
Sink.file(new File("."));
//#file-source-sink
//#input-output-stream-source-sink
final Source<ByteString, Future<java.lang.Long>> inputStreamSrc =
Source.inputStream(new Creator<InputStream>(){
public InputStream create() {
return new SomeInputStream();
}
});
final Source<ByteString, Future<java.lang.Long>> otherInputStreamSrc =
Source.inputStream(new Creator<InputStream>(){
public InputStream create() {
return new SomeInputStream();
}
}, 1024);
final Sink<ByteString, Future<java.lang.Long>> outputStreamSink =
Sink.outputStream(new Creator<OutputStream>(){
public OutputStream create() {
return new SomeOutputStream();
}
});
//#input-output-stream-source-sink
//#output-input-stream-source-sink
final FiniteDuration timeout = FiniteDuration.Zero();
final Source<ByteString, OutputStream> outputStreamSrc =
Source.outputStream();
final Source<ByteString, OutputStream> otherOutputStreamSrc =
Source.outputStream(timeout);
final Sink<ByteString, InputStream> someInputStreamSink =
Sink.inputStream();
final Sink<ByteString, InputStream> someOtherInputStreamSink =
Sink.inputStream(timeout);
//#output-input-stream-source-sink
}
}

View file

@ -424,17 +424,17 @@ should be replaced by:
SynchronousFileSource and SynchronousFileSink
============================================
Both have been replaced by `Source.file(…)` and `Sink.file(…)` due to discoverability issues
Both have been replaced by ``Source.file(…)`` and ``Sink.file(…)`` due to discoverability issues
paired with names which leaked internal implementation details.
Update procedure
----------------
Replace `SynchronousFileSource.create(` with `Source.file(`
Replace ``SynchronousFileSource.create(`` with ``Source.file(``
Replace `SynchronousFileSink.create(` with `Sink.file(`
Replace ``SynchronousFileSink.create(`` with ``Sink.file(``
Replace `SynchronousFileSink.appendTo(f)` with `Sink.file(f, true)`
Replace ``SynchronousFileSink.appendTo(f)`` with ``Sink.file(f, true)``
Example
^^^^^^^
@ -456,3 +456,86 @@ Example
should be replaced by
.. includecode:: code/docs/MigrationsJava.java#file-source-sink
InputStreamSource and OutputStreamSink
======================================
Both have been replaced by ``Source.inputStream(…)`` and ``Sink.outputStream(…)`` due to discoverability issues.
Update procedure
----------------
Replace ``InputStreamSource.create(`` with ``Source.inputStream(``
Replace ``OutputStreamSink.create(`` with ``Sink.outputStream(``
Example
^^^^^^^
::
// This no longer works!
final Source<ByteString, Future<java.lang.Long>> inputStreamSrc =
InputStreamSource.create(new Creator<InputStream>(){
public InputStream create() {
return new SomeInputStream();
}
});
// This no longer works!
final Source<ByteString, Future<java.lang.Long>> otherInputStreamSrc =
InputStreamSource.create(new Creator<InputStream>(){
public InputStream create() {
return new SomeInputStream();
}
}, 1024);
// This no longer works!
final Sink<ByteString, Future<java.lang.Long>> outputStreamSink =
OutputStreamSink.create(new Creator<OutputStream>(){
public OutputStream create() {
return new SomeOutputStream();
}
})
should be replaced by
.. includecode:: code/docs/MigrationsJava.java#input-output-stream-source-sink
OutputStreamSource and InputStreamSink
======================================
Both have been replaced by ``Source.outputStream(…)`` and ``Sink.inputStream(…)`` due to discoverability issues.
Update procedure
----------------
Replace ``OutputStreamSource.create(`` with ``Source.outputStream(``
Replace ``InputStreamSink.create(`` with ``Sink.inputStream(``
Example
^^^^^^^
::
// This no longer works!
final Source<ByteString, OutputStream> outputStreamSrc =
OutputStreamSource.create();
// This no longer works!
final Source<ByteString, OutputStream> otherOutputStreamSrc =
OutputStreamSource.create(timeout);
// This no longer works!
final Sink<ByteString, InputStream> someInputStreamSink =
InputStreamSink.create();
// This no longer works!
final Sink<ByteString, InputStream> someOtherInputStreamSink =
InputStreamSink.create(timeout);
should be replaced by
.. includecode:: code/docs/MigrationsJava.java#output-input-stream-source-sink

View file

@ -1,6 +1,6 @@
package docs
import java.io.File
import java.io.{ InputStream, File }
import akka.http.scaladsl.model.Uri
import akka.stream.scaladsl._
@ -218,6 +218,29 @@ class MigrationsScala extends AkkaSpec {
val someFileSink = Sink.file(new File("."))
//#file-source-sink
class SomeInputStream extends java.io.InputStream { override def read(): Int = 0 }
class SomeOutputStream extends java.io.OutputStream { override def write(b: Int): Unit = () }
//#input-output-stream-source-sink
val inputStreamSrc = Source.inputStream(() => new SomeInputStream())
val otherInputStreamSrc = Source.inputStream(() => new SomeInputStream())
val someOutputStreamSink = Sink.outputStream(() => new SomeOutputStream())
//#input-output-stream-source-sink
//#output-input-stream-source-sink
val timeout: FiniteDuration = 0.seconds
val outputStreamSrc = Source.outputStream()
val otherOutputStreamSrc = Source.outputStream(timeout)
val someInputStreamSink = Sink.inputStream()
val someOtherInputStreamSink = Sink.inputStream(timeout)
//#output-input-stream-source-sink
}
}
}

View file

@ -454,15 +454,15 @@ should be replaced by:
SynchronousFileSource and SynchronousFileSink
============================================
Both have been replaced by `Source.file(…)` and `Sink.file(…)` due to discoverability issues
Both have been replaced by ``Source.file(…)`` and ``Sink.file(…)`` due to discoverability issues
paired with names which leaked internal implementation details.
Update procedure
----------------
Replace `SynchronousFileSource(` and `SynchronousFileSource.apply(` with `Source.file(`
Replace ``SynchronousFileSource(`` and ``SynchronousFileSource.apply(`` with ``Source.file(``
Replace `SynchronousFileSink(` and `SynchronousFileSink.apply(` with `Sink.file(`
Replace ``SynchronousFileSink(`` and ``SynchronousFileSink.apply(`` with ``Sink.file(``
Example
^^^^^^^
@ -481,3 +481,66 @@ Example
should be replaced by
.. includecode:: code/docs/MigrationsScala.scala#file-source-sink
InputStreamSource and OutputStreamSink
============================================
Both have been replaced by ``Source.inputStream(…)`` and ``Sink.outputStream(…)`` due to discoverability issues.
Update procedure
----------------
Replace ``InputStreamSource(`` and ``InputStreamSource.apply(`` with ``Source.inputStream(``
Replace ``OutputStreamSink(`` and ``OutputStreamSink.apply(`` with ``Sink.outputStream(``
Example
^^^^^^^
::
// This no longer works!
val inputStreamSrc = InputStreamSource(() => new SomeInputStream())
// This no longer works!
val otherInputStreamSrc = InputStreamSource(() => new SomeInputStream(), 1024)
// This no longer works!
val someOutputStreamSink = OutputStreamSink(() => new SomeOutputStream())
should be replaced by
.. includecode:: code/docs/MigrationsScala.scala#input-output-stream-source-sink
OutputStreamSource and InputStreamSink
======================================
Both have been replaced by ``Source.outputStream(…)`` and ``Sink.inputStream(…)`` due to discoverability issues.
Update procedure
----------------
Replace ``OutputStreamSource(`` and ``OutputStreamSource.apply(`` with ``Source.outputStream(``
Replace ``InputStreamSink(`` and ``InputStreamSink.apply(`` with ``Sink.inputStream(``
Example
^^^^^^^
::
// This no longer works!
val outputStreamSrc = OutputStreamSource()
// This no longer works!
val otherOutputStreamSrc = OutputStreamSource(timeout)
// This no longer works!
val someInputStreamSink = InputStreamSink()
// This no longer works!
val someOtherInputStreamSink = InputStreamSink(timeout);
should be replaced by
.. includecode:: code/docs/MigrationsScala.scala#output-input-stream-source-sink

View file

@ -9,7 +9,6 @@ import java.io.File
import java.net.{ URI, URL }
import akka.stream.ActorAttributes
import akka.stream.io.{ InputStreamSource }
import akka.stream.scaladsl.Source
import scala.annotation.tailrec
@ -93,7 +92,7 @@ trait FileAndResourceDirectives {
extractSettings { settings
complete {
HttpEntity.Default(contentType, length,
InputStreamSource(() url.openStream())
Source.inputStream(() url.openStream())
.withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher)))
}
}

View file

@ -36,7 +36,7 @@ public class InputStreamSinkTest extends StreamTest {
public void mustReadEventViaInputStream() throws Exception {
final FiniteDuration timeout = FiniteDuration.create(300, TimeUnit.MILLISECONDS);
final Sink<ByteString, InputStream> sink = InputStreamSink.create(timeout);
final Sink<ByteString, InputStream> sink = Sink.inputStream(timeout);
final List<ByteString> list = Collections.singletonList(ByteString.fromString("a"));
final InputStream stream = Source.from(list).runWith(sink, materializer);

View file

@ -40,7 +40,7 @@ public class OutputStreamSourceTest extends StreamTest {
final FiniteDuration timeout = FiniteDuration.create(300, TimeUnit.MILLISECONDS);
final JavaTestKit probe = new JavaTestKit(system);
final Source<ByteString, OutputStream> source = OutputStreamSource.create(timeout);
final Source<ByteString, OutputStream> source = Source.outputStream(timeout);
final OutputStream s = source.to(Sink.foreach(new Procedure<ByteString>() {
public void apply(ByteString elem) {
probe.getRef().tell(elem, ActorRef.noSender());

View file

@ -80,7 +80,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
"InputStreamSink" must {
"read bytes from InputStream" in assertAllStagesStopped {
val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run()
val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run()
probe.sendNext(byteString)
val arr = newArray()
@ -113,7 +113,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
"returns less than was expected when the data source has provided some but not enough data" in assertAllStagesStopped {
val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run()
val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run()
val data = randomArray(2)
probe.sendNext(ByteString(data))
@ -126,7 +126,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
"block read until get requested number of bytes from upstream" in assertAllStagesStopped {
val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run()
val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run()
val arr = newArray()
val f = Future(inputStream.read(arr))
@ -141,7 +141,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
"fill up buffer by default" in assertAllStagesStopped {
import system.dispatcher
val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run()
val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run()
val array2 = randomArray(3)
probe.sendNext(byteString)
@ -162,7 +162,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
"throw error when reactive stream is closed" in assertAllStagesStopped {
val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run()
val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run()
probe.sendNext(byteString)
inputStream.close()
@ -188,7 +188,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
"return -1 when read after stream is completed" in assertAllStagesStopped {
val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run()
val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run()
probe.sendNext(byteString)
val arr = newArray()
@ -229,9 +229,9 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
val mat = ActorMaterializer()(sys)
try {
TestSource.probe[ByteString].runWith(InputStreamSink())(mat)
TestSource.probe[ByteString].runWith(Sink.inputStream())(mat)
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "InputStreamSink").get
val ref = expectMsgType[Children].children.find(_.path.toString contains "inputStreamSink").get
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher")
} finally shutdown(sys)
}

View file

@ -5,7 +5,7 @@ package akka.stream.io
import java.io.InputStream
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.{ Source, Sink }
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
@ -19,7 +19,7 @@ class InputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) with ScalaF
"InputStreamSource" must {
"read bytes from InputStream" in assertAllStagesStopped {
val f = InputStreamSource(() new InputStream {
val f = Source.inputStream(() new InputStream {
@volatile var buf = List("a", "b", "c").map(_.charAt(0).toInt)
override def read(): Int = {
buf match {

View file

@ -5,7 +5,7 @@ package akka.stream.io
import java.io.OutputStream
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{ Source, Sink }
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
@ -26,7 +26,7 @@ class OutputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
val datas = List(ByteString("a"), ByteString("c"), ByteString("c"))
val completion = Source(datas)
.runWith(OutputStreamSink(() new OutputStream {
.runWith(Sink.outputStream(() new OutputStream {
override def write(i: Int): Unit = ()
override def write(bytes: Array[Byte]): Unit = p.ref ! ByteString(bytes).utf8String
}))
@ -40,7 +40,7 @@ class OutputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
"close underlying stream when error received" in assertAllStagesStopped {
val p = TestProbe()
Source.failed(new TE("Boom!"))
.runWith(OutputStreamSink(() new OutputStream {
.runWith(Sink.outputStream(() new OutputStream {
override def write(i: Int): Unit = ()
override def close() = p.ref ! "closed"
}))
@ -51,7 +51,7 @@ class OutputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
"close underlying stream when completion received" in assertAllStagesStopped {
val p = TestProbe()
Source.empty
.runWith(OutputStreamSink(() new OutputStream {
.runWith(Sink.outputStream(() new OutputStream {
override def write(i: Int): Unit = ()
override def write(bytes: Array[Byte]): Unit = p.ref ! ByteString(bytes).utf8String
override def close() = p.ref ! "closed"

View file

@ -70,7 +70,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
"OutputStreamSource" must {
"read bytes from OutputStream" in assertAllStagesStopped {
val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run
val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both).run
val s = probe.expectSubscription()
outputStream.write(bytesArray)
@ -81,7 +81,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
"block flush call until send all buffer to downstream" in assertAllStagesStopped {
val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run
val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both).run
val s = probe.expectSubscription()
outputStream.write(bytesArray)
@ -99,7 +99,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
"not block flushes when buffer is empty" in assertAllStagesStopped {
val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run
val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both).run
val s = probe.expectSubscription()
outputStream.write(bytesArray)
@ -117,7 +117,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
"block writes when buffer is full" in assertAllStagesStopped {
val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both)
val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both)
.withAttributes(Attributes.inputBuffer(16, 16)).run
val s = probe.expectSubscription()
@ -138,7 +138,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
"throw error when write after stream is closed" in assertAllStagesStopped {
val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run
val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both).run
probe.expectSubscription()
outputStream.close()
@ -151,9 +151,9 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
val mat = ActorMaterializer()(sys)
try {
OutputStreamSource().runWith(TestSink.probe[ByteString])(mat)
Source.outputStream().runWith(TestSink.probe[ByteString])(mat)
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "OutputStreamSource").get
val ref = expectMsgType[Children].children.find(_.path.toString contains "outputStreamSource").get
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher")
} finally shutdown(sys)

View file

@ -81,9 +81,9 @@ private[stream] object Stages {
val subscriberSource = name("subscriberSource")
val actorPublisherSource = name("actorPublisherSource")
val actorRefSource = name("actorRefSource")
val inputStreamSource = name("inputStreamSource")
val acknowledgeSource = name("acknowledgeSource")
val outputStreamSource = name("outputStreamSource")
val inputStreamSource = name("inputStreamSource") and IODispatcher
val outputStreamSource = name("outputStreamSource") and IODispatcher
val fileSource = name("fileSource") and IODispatcher
val subscriberSink = name("subscriberSink")
@ -95,9 +95,9 @@ private[stream] object Stages {
val ignoreSink = name("ignoreSink")
val actorRefSink = name("actorRefSink")
val actorSubscriberSink = name("actorSubscriberSink")
val outputStreamSink = name("outputStreamSink")
val acknowledgeSink = name("acknowledgeSink")
val inputStreamSink = name("inputStreamSink")
val outputStreamSink = name("outputStreamSink") and IODispatcher
val inputStreamSink = name("inputStreamSink") and IODispatcher
val fileSink = name("fileSource") and IODispatcher
}

View file

@ -33,7 +33,7 @@ private[akka] object InputStreamSinkStage {
/**
* INTERNAL API
*/
private[akka] class InputStreamSinkStage(timeout: FiniteDuration) extends SinkStage[ByteString, InputStream]("InputStreamSink") {
private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends SinkStage[ByteString, InputStream]("InputStreamSink") {
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
@ -77,7 +77,7 @@ private[akka] class InputStreamSinkStage(timeout: FiniteDuration) extends SinkSt
}
})
}
(logic, new InputStreamAdapter(dataQueue, logic.wakeUp, timeout))
(logic, new InputStreamAdapter(dataQueue, logic.wakeUp, readTimeout))
}
}
@ -87,7 +87,7 @@ private[akka] class InputStreamSinkStage(timeout: FiniteDuration) extends SinkSt
*/
private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapterMessage],
sendToStage: (AdapterToStageMessage) Unit,
timeout: FiniteDuration)
readTimeout: FiniteDuration)
extends InputStream {
var isActive = true
@ -118,7 +118,7 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt
detachedChunk match {
case None
try {
sharedBuffer.poll(timeout.toMillis, TimeUnit.MILLISECONDS) match {
sharedBuffer.poll(readTimeout.toMillis, TimeUnit.MILLISECONDS) match {
case Data(data)
detachedChunk = Some(data)
readBytes(a, begin, length)

View file

@ -32,7 +32,7 @@ private[akka] object OutputStreamSourceStage {
}
}
private[akka] class OutputStreamSourceStage(timeout: FiniteDuration) extends SourceStage[ByteString, OutputStream]("OutputStreamSource") {
private[akka] class OutputStreamSourceStage(writeTimeout: FiniteDuration) extends SourceStage[ByteString, OutputStream]("OutputStreamSource") {
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
@ -110,14 +110,14 @@ private[akka] class OutputStreamSourceStage(timeout: FiniteDuration) extends Sou
}
})
}
(logic, new OutputStreamAdapter(dataQueue, downstreamStatus, logic.wakeUp, timeout))
(logic, new OutputStreamAdapter(dataQueue, downstreamStatus, logic.wakeUp, writeTimeout))
}
}
private[akka] class OutputStreamAdapter(dataQueue: BlockingQueue[ByteString],
downstreamStatus: AtomicReference[DownstreamStatus],
sendToStage: (AdapterToStageMessage) Future[Unit],
timeout: FiniteDuration)
writeTimeout: FiniteDuration)
extends OutputStream {
var isActive = true
@ -148,7 +148,7 @@ private[akka] class OutputStreamAdapter(dataQueue: BlockingQueue[ByteString],
private[this] def sendMessage(message: AdapterToStageMessage, handleCancelled: Boolean = true) =
send(()
try {
Await.ready(sendToStage(message), timeout)
Await.ready(sendToStage(message), writeTimeout)
if (downstreamStatus.get() == Canceled && handleCancelled) {
//Publisher considered to be terminated at earliest convenience to minimize messages sending back and forth
isPublisherAlive = false

View file

@ -1,41 +0,0 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import scala.language.implicitConversions
/**
* Provides implicit conversions such that sources and sinks contained within `akka.stream.io`
* as if they were defined on [[Source]] or [[Sink]] directly.
*
* Example:
* {{{
* import akka.stream.scaladsl.Source
* import akka.stream.io._
*
* // explicitly using IO Source:
* FileSource(file).map(...)
*
* // using implicit conversion:
* import akka.stream.io.Implicits._
* Source.synchronousFile(file).map(...)
* }}}
*/
object Implicits {
// ---- Sources ----
implicit final class AddInputStreamSource(val s: Source.type) extends AnyVal {
def inputStream: InputStreamSource.type = InputStreamSource
}
// ---- Sinks ----
implicit final class AddOutputStreamSink(val s: Sink.type) extends AnyVal {
def outputStream: OutputStreamSink.type = OutputStreamSink
}
}

View file

@ -1,57 +0,0 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
import java.io.InputStream
import java.lang.{ Long JLong }
import akka.stream.impl.io.InputStreamSinkStage
import akka.stream.scaladsl.Sink
import akka.stream.{ Attributes, ActorAttributes, javadsl }
import akka.util.ByteString
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.language.postfixOps
/**
* Sink which allows to use [[java.io.InputStream]] to interact with reactive stream.
*/
object InputStreamSink {
/**
* Creates a synchronous (Java 6 compatible) Sink
*
* It materializes an [[java.io.InputStream]] to interacting with reactive stream.
*
* This sink is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`,
* unless configured otherwise by using [[ActorAttributes]].
*/
def apply(timeout: FiniteDuration = 5 seconds): Sink[ByteString, InputStream] =
Sink.fromGraph(new InputStreamSinkStage(timeout))
.withAttributes(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher") and
Attributes.name("InputStreamSink"))
/**
* Creates a synchronous (Java 6 compatible) Sink
*
* It materializes an [[java.io.InputStream]] to interacting with reactive stream.
*
* This sink is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`,
* unless configured otherwise by using [[akka.stream.ActorAttributes]].
*/
def create(): javadsl.Sink[ByteString, InputStream] =
new javadsl.Sink(apply())
/**
* Creates a synchronous (Java 6 compatible) Sink
*
* It materializes an [[java.io.InputStream]] to interacting with reactive stream.
*
* This sink is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`,
* unless configured otherwise by using [[akka.stream.ActorAttributes]].
*/
def create(timeout: FiniteDuration): javadsl.Sink[ByteString, InputStream] =
new javadsl.Sink(apply(timeout))
}

View file

@ -1,53 +0,0 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
import java.io.InputStream
import akka.japi.function.Creator
import akka.stream.impl.io.InputStreamSource
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Source._
import akka.stream.{ Attributes, javadsl }
import akka.util.ByteString
import scala.concurrent.Future
object InputStreamSource {
final val DefaultChunkSize = 8192
final val DefaultAttributes = Attributes.name("inputStreamSource")
/**
* Creates a Source that will pull data out of the given input stream.
* Emitted elements are `chunkSize` sized [[ByteString]] elements.
*
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
*/
def apply(createInputStream: () InputStream, chunkSize: Int = DefaultChunkSize): Source[ByteString, Future[Long]] =
new Source(new InputStreamSource(createInputStream, chunkSize, DefaultAttributes, shape("InputStreamSource")))
/**
* Java API
*
* Creates a Source that will pull data out of the given input stream.
* Emitted elements are [[ByteString]] elements, chunked by default by [[DefaultChunkSize]] bytes.
*
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
*/
def create(createInputStream: Creator[InputStream]): javadsl.Source[ByteString, Future[java.lang.Long]] =
create(createInputStream, DefaultChunkSize)
/**
* Java API
*
* Creates a Source that will pull data out of the given input stream.
* Emitted elements are `chunkSize` sized [[ByteString]] elements.
*
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
*/
def create(createInputStream: Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] =
apply(() createInputStream.create(), chunkSize).asJava.asInstanceOf[javadsl.Source[ByteString, Future[java.lang.Long]]]
}

View file

@ -1,44 +0,0 @@
/**
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
import java.io.OutputStream
import akka.japi.function.Creator
import akka.stream.impl.io.OutputStreamSink
import akka.stream.scaladsl.Sink
import akka.stream.{ ActorAttributes, Attributes, javadsl }
import akka.util.ByteString
import scala.concurrent.Future
/**
* Sink which writes incoming [[ByteString]]s to the given [[OutputStream]].
*/
object OutputStreamSink {
final val DefaultAttributes = Attributes.name("outputStreamSink")
/**
* Sink which writes incoming [[ByteString]]s to the given [[OutputStream]].
*
* Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion.
*
* This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`,
* unless configured otherwise by using [[ActorAttributes]].
*/
def apply(output: () OutputStream): Sink[ByteString, Future[Long]] =
new Sink(new OutputStreamSink(output, DefaultAttributes, Sink.shape("OutputStreamSink")))
/**
* Java API
*
* Sink which writes incoming [[ByteString]]s to the given [[OutputStream]].
*
* Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion.
*/
def create(f: Creator[OutputStream]): javadsl.Sink[ByteString, Future[java.lang.Long]] =
apply(() f.create()).asJava.asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]]
}

View file

@ -1,60 +0,0 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
import java.io.OutputStream
import akka.stream.Attributes.Name
import akka.stream._
import akka.stream.impl.io.OutputStreamSourceStage
import akka.stream.scaladsl.{ FlowGraph, Source }
import akka.util.ByteString
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.implicitConversions
/**
* Source which allows to use [[java.io.OutputStream]] to interact with reactive stream.
*/
object OutputStreamSource {
import scala.language.postfixOps
/**
* Creates a synchronous (Java 6 compatible) Source.
*
* It materializes an [[java.io.OutputStream]] to interact with reactive stream.
*
* This source is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`,
* unless configured otherwise by using [[akka.stream.ActorAttributes]].
*/
def apply(timeout: FiniteDuration = 5.seconds): Source[ByteString, OutputStream] =
Source.fromGraph(new OutputStreamSourceStage(timeout))
.withAttributes(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher") and
Attributes.name("OutputStreamSource"))
/**
* Creates a synchronous (Java 6 compatible) Source.
*
* It materializes an [[java.io.OutputStream]] to interact with reactive stream.
*
* This source is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`,
* unless configured otherwise by using [[akka.stream.ActorAttributes]].
*/
def create(): javadsl.Source[ByteString, OutputStream] =
new javadsl.Source(apply())
/**
* Creates a synchronous (Java 6 compatible) Source.
*
* It materializes an [[java.io.OutputStream]] to interacting with reactive stream.
*
* This source is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`,
* unless configured otherwise by using [[akka.stream.ActorAttributes]].
*/
def create(timeout: FiniteDuration): javadsl.Source[ByteString, OutputStream] = {
new javadsl.Source(apply(timeout))
}
}

View file

@ -3,7 +3,7 @@
*/
package akka.stream.javadsl
import java.io.File
import java.io.{ InputStream, OutputStream, File }
import akka.actor.{ ActorRef, Props }
import akka.dispatch.ExecutionContexts
@ -182,6 +182,8 @@ object Sink {
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param f The file to write to
*/
def file(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] = file(f, append = false)
@ -193,10 +195,53 @@ object Sink {
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param f The file to write to
* @param append Whether or not the file should be overwritten or appended to
*/
def file(f: File, append: Boolean): javadsl.Sink[ByteString, Future[java.lang.Long]] =
new Sink(scaladsl.Sink.file(f, append)).asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]]
/**
* Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function.
*
* Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param f A Creator which creates an OutputStream to write to
*/
def outputStream(f: function.Creator[OutputStream]): javadsl.Sink[ByteString, Future[java.lang.Long]] =
new Sink(scaladsl.Sink.outputStream(() f.create())).asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]]
/**
* Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible
* to read the values produced by the stream this Sink is attached to.
*
* This method uses a default read timeout, use [[#inputStream(FiniteDuration)]] to explicitly
* configure the timeout.
*
* This Sink is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*/
def inputStream(): Sink[ByteString, InputStream] = new Sink(scaladsl.Sink.inputStream())
/**
* Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible
* to read the values produced by the stream this Sink is attached to.
*
* This Sink is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param readTimeout the max time the read operation on the materialized InputStream should block
*/
def inputStream(readTimeout: FiniteDuration): Sink[ByteString, InputStream] =
new Sink(scaladsl.Sink.inputStream(readTimeout))
}
/**

View file

@ -3,7 +3,7 @@
*/
package akka.stream.javadsl
import java.io.File
import java.io.{ OutputStream, InputStream, File }
import akka.actor.{ ActorRef, Cancellable, Props }
import akka.event.LoggingAdapter
@ -260,8 +260,60 @@ object Source {
*
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
*/
def file(f: File, chunkSize: Int): Source[ByteString, Future[java.lang.Long]] =
def file(f: File, chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] =
new Source(scaladsl.Source.file(f, chunkSize)).asInstanceOf[Source[ByteString, Future[java.lang.Long]]]
/**
* Creates a Source from an [[java.io.InputStream]] created by the given function.
* Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
* except the final element, which will be up to `chunkSize` in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
*/
def inputStream(in: function.Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] =
new Source(scaladsl.Source.inputStream(() in.create(), chunkSize)).asInstanceOf[Source[ByteString, Future[java.lang.Long]]]
/**
* Creates a Source from an [[java.io.InputStream]] created by the given function.
* Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes,
* except the last element, which will be up to 8192 in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
*/
def inputStream(in: function.Creator[InputStream]): javadsl.Source[ByteString, Future[java.lang.Long]] = inputStream(in, 8192)
/**
* Creates a Source which when materialized will return an [[java.io.OutputStream]] which it is possible
* to write the ByteStrings to the stream this Source is attached to.
*
* This Source is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param writeTimeout the max time the write operation on the materialized OutputStream should block
*/
def outputStream(writeTimeout: FiniteDuration): javadsl.Source[ByteString, OutputStream] =
new Source(scaladsl.Source.outputStream(writeTimeout))
/**
* Creates a Source which when materialized will return an [[java.io.OutputStream]] which it is possible
* to write the ByteStrings to the stream this Source is attached to. The write timeout for OutputStreams
* materialized will default to 5 seconds, @see [[#outputStream(FiniteDuration)]] if you want to override it.
*
* This Source is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*/
def outputStream(): javadsl.Source[ByteString, OutputStream] =
new Source(scaladsl.Source.outputStream())
}
/**

View file

@ -3,7 +3,7 @@
*/
package akka.stream.scaladsl
import java.io.File
import java.io.{ InputStream, OutputStream, File }
import akka.actor.{ ActorRef, Props }
import akka.dispatch.ExecutionContexts
@ -11,7 +11,7 @@ import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._
import akka.stream.impl.io.FileSink
import akka.stream.impl.io.{ InputStreamSinkStage, OutputStreamSink, FileSink }
import akka.stream.stage.{ Context, PushStage, SyncDirective, TerminationDirective }
import akka.stream.{ javadsl, _ }
import akka.util.ByteString
@ -255,4 +255,29 @@ object Sink {
*/
def file(f: File, append: Boolean = false): Sink[ByteString, Future[Long]] =
new Sink(new FileSink(f, append, DefaultAttributes.fileSink, shape("FileSink")))
/**
* Creates a Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function.
*
* Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*/
def outputStream(out: () OutputStream): Sink[ByteString, Future[Long]] =
new Sink(new OutputStreamSink(out, DefaultAttributes.outputStreamSink, shape("OutputStreamSink")))
/**
* Creates a Sink which when materialized will return an [[InputStream]] which it is possible
* to read the values produced by the stream this Sink is attached to.
*
* This Sink is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param readTimeout the max time the read operation on the materialized InputStream should block
*/
def inputStream(readTimeout: FiniteDuration = 5.seconds): Sink[ByteString, InputStream] =
Sink.fromGraph(new InputStreamSinkStage(readTimeout)).withAttributes(DefaultAttributes.inputStreamSink)
}

View file

@ -3,14 +3,14 @@
*/
package akka.stream.scaladsl
import java.io.File
import java.io.{ OutputStream, InputStream, File }
import akka.actor.{ ActorRef, Cancellable, Props }
import akka.stream.actor.ActorPublisher
import akka.stream.impl.Stages.{ DefaultAttributes, StageModule }
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.GraphStages.TickSource
import akka.stream.impl.io.FileSource
import akka.stream.impl.io.{ OutputStreamSourceStage, InputStreamSource, FileSource }
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ }
import akka.stream.{ Outlet, SourceShape, _ }
import akka.util.ByteString
@ -382,8 +382,41 @@ object Source {
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
*
* @param f the File to read from
* @param chunkSize the size of each read operation, defaults to 8192
*/
def file(f: File, chunkSize: Int = 8192): Source[ByteString, Future[Long]] =
new Source(new FileSource(f, chunkSize, DefaultAttributes.fileSource, shape("FileSource")))
/**
* Creates a Source from an [[InputStream]] created by the given function.
* Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
* except the final element, which will be up to `chunkSize` in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
*
* @param in a function which creates the InputStream to read from
* @param chunkSize the size of each read operation, defaults to 8192
*/
def inputStream(in: () InputStream, chunkSize: Int = 8192): Source[ByteString, Future[Long]] =
new Source(new InputStreamSource(in, chunkSize, DefaultAttributes.inputStreamSource, shape("InputStreamSource")))
/**
* Creates a Source which when materialized will return an [[OutputStream]] which it is possible
* to write the ByteStrings to the stream this Source is attached to.
*
* This Source is intended for inter-operation with legacy APIs since it is inherently blocking.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param writeTimeout the max time the write operation on the materialized OutputStream should block, defaults to 5 seconds
*/
def outputStream(writeTimeout: FiniteDuration = 5.seconds): Source[ByteString, OutputStream] =
Source.fromGraph(new OutputStreamSourceStage(writeTimeout)).withAttributes(DefaultAttributes.outputStreamSource)
}