From 8ea52a6bb453517f4bd2bcf8dbf6e82fa81fa413 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Sun, 12 Jul 2015 23:04:26 -0400 Subject: [PATCH] +str #17338 add OutputStreamSource and InputStreamSink --- akka-docs-dev/rst/java/stream-io.rst | 2 +- .../docs/stream/io/StreamFileDocSpec.scala | 2 +- akka-docs-dev/rst/scala/stream-io.rst | 2 +- akka-http/src/main/resources/reference.conf | 2 +- .../akka/stream/io/InputStreamSinkTest.java | 48 ++++ .../stream/io/OutputStreamSourceTest.java | 56 ++++ .../akka/stream/io/InputStreamSinkSpec.scala | 241 ++++++++++++++++++ .../stream/io/OutputStreamSourceSpec.scala | 179 +++++++++++++ .../stream/io/SynchronousFileSinkSpec.scala | 4 +- .../stream/io/SynchronousFileSourceSpec.scala | 4 +- .../stream/scaladsl/GraphBackedFlowSpec.scala | 7 +- akka-stream/src/main/resources/reference.conf | 4 +- .../main/scala/akka/stream/impl/Stages.scala | 2 + .../akka/stream/impl/io/IOSettings.scala | 7 +- .../scala/akka/stream/impl/io/IOSinks.scala | 6 +- .../scala/akka/stream/impl/io/IOSources.scala | 15 +- .../stream/impl/io/InputStreamSinkStage.scala | 200 +++++++++++++++ .../impl/io/OutputStreamSourceStage.scala | 178 +++++++++++++ .../akka/stream/io/InputStreamSink.scala | 57 +++++ .../akka/stream/io/OutputStreamSink.scala | 2 +- .../akka/stream/io/OutputStreamSource.scala | 60 +++++ .../akka/stream/io/SynchronousFileSink.scala | 6 +- .../stream/io/SynchronousFileSource.scala | 6 +- .../scala/akka/stream/stage/GraphStage.scala | 18 ++ 24 files changed, 1076 insertions(+), 32 deletions(-) create mode 100644 akka-stream-tests/src/test/java/akka/stream/io/InputStreamSinkTest.java create mode 100644 akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java create mode 100644 akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala create mode 100644 akka-stream/src/main/scala/akka/stream/io/InputStreamSink.scala create mode 100644 akka-stream/src/main/scala/akka/stream/io/OutputStreamSource.scala diff --git a/akka-docs-dev/rst/java/stream-io.rst b/akka-docs-dev/rst/java/stream-io.rst index d478b6d8ef..71f6a8b9c3 100644 --- a/akka-docs-dev/rst/java/stream-io.rst +++ b/akka-docs-dev/rst/java/stream-io.rst @@ -118,7 +118,7 @@ Streaming data from a file is as easy as defining a `SynchronousFileSource` give Please note that these processing stages are backed by Actors and by default are configured to run on a pre-configured threadpool-backed dispatcher dedicated for File IO. This is very important as it isolates the blocking file IO operations from the rest of the ActorSystem allowing each dispatcher to be utilised in the most efficient way. If you want to configure a custom -dispatcher for file IO operations globally, you can do so by changing the ``akka.stream.file-io-dispatcher``, +dispatcher for file IO operations globally, you can do so by changing the ``akka.stream.blocking-io-dispatcher``, or for a specific stage by specifying a custom Dispatcher in code, like this: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/io/StreamFileDocTest.java#custom-dispatcher-code diff --git a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala index 6b3543799d..4979d30e2b 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala @@ -57,7 +57,7 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) { "configure dispatcher in code" in { //#custom-dispatcher-code SynchronousFileSink(file) - .withAttributes(ActorAttributes.dispatcher("custom-file-io-dispatcher")) + .withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher")) //#custom-dispatcher-code } diff --git a/akka-docs-dev/rst/scala/stream-io.rst b/akka-docs-dev/rst/scala/stream-io.rst index 5f4a3b9176..1cb32c044a 100644 --- a/akka-docs-dev/rst/scala/stream-io.rst +++ b/akka-docs-dev/rst/scala/stream-io.rst @@ -118,7 +118,7 @@ Streaming data from a file is as easy as defining a `SynchronousFileSource` give Please note that these processing stages are backed by Actors and by default are configured to run on a pre-configured threadpool-backed dispatcher dedicated for File IO. This is very important as it isolates the blocking file IO operations from the rest of the ActorSystem allowing each dispatcher to be utilised in the most efficient way. If you want to configure a custom -dispatcher for file IO operations globally, you can do so by changing the ``akka.stream.file-io-dispatcher``, +dispatcher for file IO operations globally, you can do so by changing the ``akka.stream.blocking-io-dispatcher``, or for a specific stage by specifying a custom Dispatcher in code, like this: .. includecode:: code/docs/stream/io/StreamFileDocSpec.scala#custom-dispatcher-code diff --git a/akka-http/src/main/resources/reference.conf b/akka-http/src/main/resources/reference.conf index eba81d60a1..87a3f24d1e 100644 --- a/akka-http/src/main/resources/reference.conf +++ b/akka-http/src/main/resources/reference.conf @@ -39,5 +39,5 @@ akka.http.routing { # Fully qualified config path which holds the dispatcher configuration # to be used by FlowMaterialiser when creating Actors for IO operations. - file-io-dispatcher = ${akka.stream.file-io-dispatcher} + file-io-dispatcher = ${akka.stream.blocking-io-dispatcher} } diff --git a/akka-stream-tests/src/test/java/akka/stream/io/InputStreamSinkTest.java b/akka-stream-tests/src/test/java/akka/stream/io/InputStreamSinkTest.java new file mode 100644 index 0000000000..875e9137e7 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/io/InputStreamSinkTest.java @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io; + +import akka.japi.Pair; +import akka.stream.StreamTest; +import akka.stream.javadsl.AkkaJUnitActorSystemResource; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.stream.testkit.AkkaSpec; +import akka.stream.testkit.Utils; +import akka.util.ByteString; +import org.junit.ClassRule; +import org.junit.Test; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +public class InputStreamSinkTest extends StreamTest { + public InputStreamSinkTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("InputStreamSink", + Utils.UnboundedMailboxConfig()); + @Test + public void mustReadEventViaInputStream() throws Exception { + final FiniteDuration timeout = FiniteDuration.create(300, TimeUnit.MILLISECONDS); + + final Sink sink = InputStreamSink.create(timeout); + final List list = Collections.singletonList(ByteString.fromString("a")); + final InputStream stream = Source.from(list).runWith(sink, materializer); + + byte[] a = new byte[1]; + stream.read(a); + assertTrue(Arrays.equals("a".getBytes(), a)); + } + +} diff --git a/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java new file mode 100644 index 0000000000..ca5682c054 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io; + +import akka.actor.ActorRef; +import akka.japi.Pair; +import akka.japi.function.Procedure; +import akka.stream.StreamTest; +import akka.stream.javadsl.AkkaJUnitActorSystemResource; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.stream.testkit.AkkaSpec; +import akka.stream.testkit.Utils; +import akka.testkit.JavaTestKit; +import akka.util.ByteString; +import com.typesafe.config.ConfigFactory; +import org.junit.ClassRule; +import org.junit.Test; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; +import scala.runtime.BoxedUnit; + +import java.io.OutputStream; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class OutputStreamSourceTest extends StreamTest { + public OutputStreamSourceTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("OutputStreamSource", + Utils.UnboundedMailboxConfig()); + @Test + public void mustSendEventsViaOutputStream() throws Exception { + final FiniteDuration timeout = FiniteDuration.create(300, TimeUnit.MILLISECONDS); + final JavaTestKit probe = new JavaTestKit(system); + + final Source source = OutputStreamSource.create(timeout); + final OutputStream s = source.to(Sink.foreach(new Procedure() { + public void apply(ByteString elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + })).run(materializer); + + s.write("a".getBytes()); + assertEquals(ByteString.fromString("a"), probe.receiveOne(timeout)); + s.close(); + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala new file mode 100644 index 0000000000..66f31eb1df --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala @@ -0,0 +1,241 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io + +import java.io.{ IOException, InputStream } +import java.util.concurrent.TimeoutException + +import akka.actor.{ ActorSystem, NoSerializationVerificationNeeded } +import akka.stream._ +import akka.stream.impl.StreamSupervisor.Children +import akka.stream.impl.io.InputStreamSinkStage +import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor } +import akka.stream.scaladsl.{ Keep, Sink } +import akka.stream.stage.InHandler +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.TestSource +import akka.testkit.TestProbe +import akka.util.ByteString + +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future } +import scala.util.Random +import scala.util.control.NoStackTrace + +class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { + import system.dispatcher + + val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") + implicit val materializer = ActorMaterializer(settings) + + val timeout = 300.milliseconds + def randomArray(size: Int) = { + val a = new Array[Byte](size) + Random.nextBytes(a) + a + } + + val byteArray = randomArray(3) + val byteString = ByteString(byteArray) + + def newArray() = new Array[Byte](3) + + def expectSuccess[T](f: Future[T], value: T) = + Await.result(f, timeout) should be(value) + + object InputStreamSinkTestMessages { + case object Push extends NoSerializationVerificationNeeded + case object Finish extends NoSerializationVerificationNeeded + case class Failure(ex: Throwable) extends NoSerializationVerificationNeeded + } + + def testSink(probe: TestProbe): Sink[ByteString, InputStream] = { + class InputStreamSinkTestStage(val timeout: FiniteDuration) + extends InputStreamSinkStage(timeout) { + + override def createLogicAndMaterializedValue = { + val (logic, inputStream) = super.createLogicAndMaterializedValue + val inHandler = logic.inHandlers(in.id) + logic.inHandlers(in.id) = new InHandler { + override def onPush(): Unit = { + probe.ref ! InputStreamSinkTestMessages.Push + inHandler.onPush() + } + override def onUpstreamFinish(): Unit = { + probe.ref ! InputStreamSinkTestMessages.Finish + inHandler.onUpstreamFinish() + } + override def onUpstreamFailure(ex: Throwable): Unit = { + probe.ref ! InputStreamSinkTestMessages.Failure(ex) + inHandler.onUpstreamFailure(ex) + } + } + (logic, inputStream) + } + } + Sink.fromGraph(new InputStreamSinkTestStage(timeout)) + } + + "InputStreamSink" must { + "read bytes from InputStream" in assertAllStagesStopped { + val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run() + + probe.sendNext(byteString) + val arr = newArray() + inputStream.read(arr) + arr should ===(byteArray) + + probe.sendComplete() + inputStream.close() + } + + "read bytes correctly if requested by InputStream not in chunk size" in assertAllStagesStopped { + val sinkProbe = TestProbe() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(testSink(sinkProbe))(Keep.both).run() + + probe.sendNext(byteString) + val byteArray2 = randomArray(3) + probe.sendNext(ByteString(byteArray2)) + + sinkProbe.expectMsgAllOf(InputStreamSinkTestMessages.Push, InputStreamSinkTestMessages.Push) + + val arr = new Array[Byte](2) + inputStream.read(arr) + arr should ===(Array(byteArray(0), byteArray(1))) + inputStream.read(arr) + arr should ===(Array(byteArray(2), byteArray2(0))) + inputStream.read(arr) + arr should ===(Array(byteArray2(1), byteArray2(2))) + + inputStream.close() + } + + "returns less than was expected when the data source has provided some but not enough data" in assertAllStagesStopped { + val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run() + + val data = randomArray(2) + probe.sendNext(ByteString(data)) + val arr = newArray() + inputStream.read(arr) should ===(2) + arr should ===(Array(data(0), data(1), 0)) + + probe.sendComplete() + inputStream.close() + } + + "block read until get requested number of bytes from upstream" in assertAllStagesStopped { + val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run() + + val arr = newArray() + val f = Future(inputStream.read(arr)) + the[Exception] thrownBy Await.result(f, timeout) shouldBe a[TimeoutException] + probe.sendNext(byteString) + expectSuccess(f, 3) + + probe.sendComplete() + inputStream.read(newArray()) + inputStream.close() + } + + "fill up buffer by default" in assertAllStagesStopped { + import system.dispatcher + val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run() + + val array2 = randomArray(3) + probe.sendNext(byteString) + probe.sendNext(ByteString(array2)) + + val arr1 = newArray() + val arr2 = newArray() + val f1 = Future(inputStream.read(arr1)) + val f2 = Future(inputStream.read(arr2)) + Await.result(f1, timeout) should be(3) + Await.result(f2, timeout) should be(3) + + arr1 should ===(byteString) + arr2 should ===(array2) + + probe.sendComplete() + inputStream.close() + } + + "throw error when reactive stream is closed" in assertAllStagesStopped { + val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run() + + probe.sendNext(byteString) + inputStream.close() + probe.expectCancellation() + the[Exception] thrownBy inputStream.read(newArray()) shouldBe a[IOException] + } + + "return all data when upstream is completed" in assertAllStagesStopped { + val sinkProbe = TestProbe() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(testSink(sinkProbe))(Keep.both).run() + + val bytes = randomArray(1) + probe.sendNext(ByteString(bytes)) + sinkProbe.expectMsg(InputStreamSinkTestMessages.Push) + + probe.sendComplete() + sinkProbe.expectMsg(InputStreamSinkTestMessages.Finish) + + val arr = newArray() + val f = Future(inputStream.read(arr)) + expectSuccess(f, 1) + arr should ===(Array[Byte](bytes(0), 0, 0)) + } + + "return -1 when read after stream is completed" in assertAllStagesStopped { + val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run() + + probe.sendNext(byteString) + val arr = newArray() + inputStream.read(arr) + arr should ===(byteArray) + probe.sendComplete() + + Await.result(Future(inputStream.read(arr)), timeout) should ===(-1) + + inputStream.close() + } + + "return IOException when stream is failed" in assertAllStagesStopped { + val sinkProbe = TestProbe() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(testSink(sinkProbe))(Keep.both).run() + val ex = new RuntimeException("Stream failed.") with NoStackTrace + + probe.sendNext(byteString) + sinkProbe.expectMsg(InputStreamSinkTestMessages.Push) + + val arr = newArray() + inputStream.read(arr) + + probe.sendError(ex) + sinkProbe.expectMsg(InputStreamSinkTestMessages.Failure(ex)) + val p = Future(inputStream.read(arr)) + p.onFailure { + case e ⇒ + (e.isInstanceOf[IOException] && e.getCause.equals(ex)) should ===(true) + Unit + } + p.onSuccess { case _ ⇒ fail() } + + } + + "use dedicated default-blocking-io-dispatcher by default" in assertAllStagesStopped { + val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) + val mat = ActorMaterializer()(sys) + + try { + TestSource.probe[ByteString].runWith(InputStreamSink())(mat) + mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children.find(_.path.toString contains "InputStreamSink").get + assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") + } finally shutdown(sys) + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala new file mode 100644 index 0000000000..1290c59fdb --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala @@ -0,0 +1,179 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io + +import java.io.{ IOException, OutputStream } +import java.util.concurrent.TimeoutException + +import akka.actor.{ ActorSystem, NoSerializationVerificationNeeded } +import akka.stream._ +import akka.stream.impl.StreamSupervisor.Children +import akka.stream.impl.io.OutputStreamSourceStage +import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor } +import akka.stream.scaladsl.{ Keep, Source } +import akka.stream.stage.OutHandler +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink +import akka.testkit.TestProbe +import akka.util.ByteString + +import scala.concurrent.duration.Duration.Zero +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future } +import scala.util.Random + +class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { + import system.dispatcher + + val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") + implicit val materializer = ActorMaterializer(settings) + + val timeout = 300.milliseconds + val bytesArray = Array.fill[Byte](3)(Random.nextInt(1024).asInstanceOf[Byte]) + val byteString = ByteString(bytesArray) + + def expectTimeout[T](f: Future[T], timeout: Duration) = + the[Exception] thrownBy Await.result(f, timeout) shouldBe a[TimeoutException] + + def expectSuccess[T](f: Future[T], value: T) = + Await.result(f, timeout) should be(value) + + object OutputStreamSourceTestMessages { + case object Pull extends NoSerializationVerificationNeeded + case object Finish extends NoSerializationVerificationNeeded + } + + def testSource(probe: TestProbe): Source[ByteString, OutputStream] = { + class OutputStreamSourceTestStage(val timeout: FiniteDuration) + extends OutputStreamSourceStage(timeout) { + + override def createLogicAndMaterializedValue = { + val (logic, inputStream) = super.createLogicAndMaterializedValue + val outHandler = logic.outHandlers(out.id) + logic.outHandlers(out.id) = new OutHandler { + override def onDownstreamFinish(): Unit = { + probe.ref ! OutputStreamSourceTestMessages.Finish + outHandler.onDownstreamFinish() + } + override def onPull(): Unit = { + probe.ref ! OutputStreamSourceTestMessages.Pull + outHandler.onPull() + } + } + (logic, inputStream) + } + } + Source.fromGraph(new OutputStreamSourceTestStage(timeout)) + } + + "OutputStreamSource" must { + "read bytes from OutputStream" in assertAllStagesStopped { + val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run + val s = probe.expectSubscription() + + outputStream.write(bytesArray) + s.request(1) + probe.expectNext(byteString) + outputStream.close() + probe.expectComplete() + } + + "block flush call until send all buffer to downstream" in assertAllStagesStopped { + val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run + val s = probe.expectSubscription() + + outputStream.write(bytesArray) + val f = Future(outputStream.flush()) + + expectTimeout(f, timeout) + probe.expectNoMsg(Zero) + + s.request(1) + expectSuccess(f, ()) + probe.expectNext(byteString) + + outputStream.close() + probe.expectComplete() + } + + "not block flushes when buffer is empty" in assertAllStagesStopped { + val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run + val s = probe.expectSubscription() + + outputStream.write(bytesArray) + + val f = Future(outputStream.flush()) + s.request(1) + expectSuccess(f, ()) + probe.expectNext(byteString) + + val f2 = Future(outputStream.flush()) + expectSuccess(f2, ()) + + outputStream.close() + probe.expectComplete() + } + + "block writes when buffer is full" in assertAllStagesStopped { + val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both) + .withAttributes(Attributes.inputBuffer(16, 16)).run + val s = probe.expectSubscription() + + (1 to 16).foreach { _ ⇒ outputStream.write(bytesArray) } + + //blocked call + val f = Future(outputStream.write(bytesArray)) + + expectTimeout(f, timeout) + probe.expectNoMsg(Zero) + + s.request(17) + expectSuccess(f, ()) + probe.expectNextN(List.fill(17)(byteString).toSeq) + + outputStream.close() + probe.expectComplete() + } + + "throw error when write after stream is closed" in assertAllStagesStopped { + val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run + + probe.expectSubscription() + outputStream.close() + probe.expectComplete() + the[Exception] thrownBy outputStream.write(bytesArray) shouldBe a[IOException] + } + + "use dedicated default-blocking-io-dispatcher by default" in assertAllStagesStopped { + val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) + val mat = ActorMaterializer()(sys) + + try { + OutputStreamSource().runWith(TestSink.probe[ByteString])(mat) + mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children.find(_.path.toString contains "OutputStreamSource").get + assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") + } finally shutdown(sys) + + } + + "throw IOException when writing to the stream after the subscriber has cancelled the reactive stream" in assertAllStagesStopped { + val sourceProbe = TestProbe() + val (outputStream, probe) = testSource(sourceProbe).toMat(TestSink.probe[ByteString])(Keep.both).run + + val s = probe.expectSubscription() + + outputStream.write(bytesArray) + s.request(1) + sourceProbe.expectMsg(OutputStreamSourceTestMessages.Pull) + + probe.expectNext(byteString) + + s.cancel() + sourceProbe.expectMsg(OutputStreamSourceTestMessages.Finish) + the[Exception] thrownBy outputStream.write(bytesArray) shouldBe a[IOException] + } + } +} \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala index c791914267..ddaa3e6bcc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala @@ -91,7 +91,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } } - "use dedicated file-io-dispatcher by default" in assertAllStagesStopped { + "use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped { targetFile { f ⇒ val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val mat = ActorMaterializer()(sys) @@ -102,7 +102,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get - assertDispatcher(ref, "akka.stream.default-file-io-dispatcher") + assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") } finally shutdown(sys) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala index b1936e51c1..656a5bf328 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala @@ -166,7 +166,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } } - "use dedicated file-io-dispatcher by default" in assertAllStagesStopped { + "use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped { val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val mat = ActorMaterializer()(sys) implicit val timeout = Timeout(500.millis) @@ -176,7 +176,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get - try assertDispatcher(ref, "akka.stream.default-file-io-dispatcher") finally p.cancel() + try assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") finally p.cancel() } finally shutdown(sys) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala index 311a8320b0..9bebe5e48c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala @@ -138,12 +138,7 @@ class GraphFlowSpec extends AkkaSpec { "work with a Sink when having KeyedSource inside" in { val probe = TestSubscriber.manualProbe[Int]() - - val source = Source.fromGraph(FlowGraph.create(Source.subscriber[Int]) { implicit b ⇒ - subSource ⇒ - SourceShape(subSource.outlet) - }) - + val source = Source.subscriber[Int] val mm: Subscriber[Int] = source.to(Sink(probe)).run() source1.to(Sink(mm)).run() diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 0331d9fc99..0bf4ed2e5d 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -45,9 +45,9 @@ akka { # Fully qualified config path which holds the dispatcher configuration # to be used by FlowMaterialiser when creating Actors for IO operations, # such as FileSource, FileSink and others. - file-io-dispatcher = "akka.stream.default-file-io-dispatcher" + blocking-io-dispatcher = "akka.stream.default-blocking-io-dispatcher" - default-file-io-dispatcher { + default-blocking-io-dispatcher { type = "Dispatcher" executor = "thread-pool-executor" throughput = 1 diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 05703b1293..485c87109d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -78,6 +78,7 @@ private[stream] object Stages { val synchronousFileSource = name("synchronousFileSource") val inputStreamSource = name("inputStreamSource") val acknowledgeSource = name("acknowledgeSource") + val outputStreamSource = name("outputStreamSource") val subscriberSink = name("subscriberSink") val cancelledSink = name("cancelledSink") @@ -90,6 +91,7 @@ private[stream] object Stages { val synchronousFileSink = name("synchronousFileSink") val outputStreamSink = name("outputStreamSink") val acknowledgeSink = name("acknowledgeSink") + val inputStreamSink = name("inputStreamSink") } import DefaultAttributes._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala index f09c86b5d5..2e6da93a1d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala @@ -4,11 +4,12 @@ import akka.stream.ActorAttributes.Dispatcher import akka.stream.{ ActorMaterializer, MaterializationContext } private[stream] object IOSettings { - /** Picks default akka.stream.file-io-dispatcher or the Attributes configured one */ - def fileIoDispatcher(context: MaterializationContext): String = { + + /** Picks default akka.stream.blocking-io-dispatcher or the Attributes configured one */ + def blockingIoDispatcher(context: MaterializationContext): String = { val mat = ActorMaterializer.downcast(context.materializer) context.effectiveAttributes.attributeList.collectFirst { case d: Dispatcher ⇒ d.dispatcher } getOrElse { - mat.system.settings.config.getString("akka.stream.file-io-dispatcher") + mat.system.settings.config.getString("akka.stream.blocking-io-dispatcher") } } } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index db6578f3d3..4573934055 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -4,10 +4,11 @@ package akka.stream.impl.io import java.io.{ File, OutputStream } +import java.lang.{ Long ⇒ JLong } +import akka.stream._ import akka.stream.impl.SinkModule import akka.stream.impl.StreamLayout.Module -import akka.stream.{ ActorMaterializer, MaterializationContext, Attributes, SinkShape } import akka.util.ByteString import scala.concurrent.{ Future, Promise } @@ -26,7 +27,7 @@ private[akka] final class SynchronousFileSink(f: File, append: Boolean, val attr val bytesWrittenPromise = Promise[Long]() val props = SynchronousFileSubscriber.props(f, bytesWrittenPromise, settings.maxInputBufferSize, append) - val dispatcher = IOSettings.fileIoDispatcher(context) + val dispatcher = IOSettings.blockingIoDispatcher(context) val ref = mat.actorOf(context, props.withDispatcher(dispatcher)) (akka.stream.actor.ActorSubscriber[ByteString](ref), bytesWrittenPromise.future) @@ -66,3 +67,4 @@ private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, va override def withAttributes(attr: Attributes): Module = new OutputStreamSink(createOutput, attr, amendShape(attr)) } + diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index 633e998050..fa2cb7b7d2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -3,15 +3,22 @@ */ package akka.stream.impl.io -import java.io.{ File, InputStream } +import java.io.{ File, IOException, InputStream, OutputStream } +import java.lang.{ Long ⇒ JLong } +import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue } +import akka.actor.{ ActorRef, Deploy } +import akka.japi import akka.stream._ import akka.stream.impl.StreamLayout.Module import akka.stream.impl.{ ErrorPublisher, SourceModule } -import akka.util.ByteString +import akka.stream.scaladsl.{ Source, FlowGraph } +import akka.util.{ ByteString, Timeout } import org.reactivestreams._ -import scala.concurrent.{ Future, Promise } +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ Await, Future, Promise } +import scala.util.control.NonFatal /** * INTERNAL API @@ -26,7 +33,7 @@ private[akka] final class SynchronousFileSource(f: File, chunkSize: Int, val att val bytesReadPromise = Promise[Long]() val props = SynchronousFilePublisher.props(f, bytesReadPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize) - val dispatcher = IOSettings.fileIoDispatcher(context) + val dispatcher = IOSettings.blockingIoDispatcher(context) val ref = mat.actorOf(context, props.withDispatcher(dispatcher)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala new file mode 100644 index 0000000000..aec1109411 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -0,0 +1,200 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl.io + +import java.io.{ IOException, InputStream } +import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit } + +import akka.stream.Attributes.InputBuffer +import akka.stream.impl.io.InputStreamSinkStage._ +import akka.stream.stage._ +import akka.util.ByteString + +import scala.annotation.tailrec +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +private[akka] object InputStreamSinkStage { + + sealed trait AdapterToStageMessage + case object ReadElementAcknowledgement extends AdapterToStageMessage + case object Close extends AdapterToStageMessage + + sealed trait StreamToAdapterMessage + case class Data(data: ByteString) extends StreamToAdapterMessage + case object Finished extends StreamToAdapterMessage + case class Failed(cause: Throwable) extends StreamToAdapterMessage + + sealed trait StageWithCallback { + def wakeUp(msg: AdapterToStageMessage): Unit + } +} + +/** + * INTERNAL API + */ +private[akka] class InputStreamSinkStage(timeout: FiniteDuration) extends SinkStage[ByteString, InputStream]("InputStreamSinkStage") { + val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + require(maxBuffer > 0, "Buffer size must be greater than 0") + + override def createLogicAndMaterializedValue: (GraphStageLogic, InputStream) = { + + val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 1) + var pullRequestIsSent = true + + val logic = new GraphStageLogic(shape) with StageWithCallback { + + private val callback: AsyncCallback[AdapterToStageMessage] = + getAsyncCallback(onAsyncMessage) + + override def wakeUp(msg: AdapterToStageMessage): Unit = { + if (!isClosed(in)) { + Future(callback.invoke(msg))(interpreter.materializer.executionContext) + } + } + + private def onAsyncMessage(event: AdapterToStageMessage): Unit = + event match { + case ReadElementAcknowledgement ⇒ + sendPullIfAllowed() + case Close ⇒ + completeStage() + } + + private def sendPullIfAllowed(): Unit = + if (!pullRequestIsSent) { + pullRequestIsSent = true + pull(in) + } + + override def preStart() = pull(in) + + setHandler(in, new InHandler { + override def onPush(): Unit = { + //1 is buffer for Finished or Failed callback + require(dataQueue.remainingCapacity() > 1) + pullRequestIsSent = false + dataQueue.add(Data(grab(in))) + if (dataQueue.remainingCapacity() > 1) sendPullIfAllowed() + } + override def onUpstreamFinish(): Unit = { + dataQueue.add(Finished) + completeStage() + } + override def onUpstreamFailure(ex: Throwable): Unit = { + dataQueue.add(Failed(ex)) + failStage(ex) + } + }) + } + (logic, new InputStreamAdapter(dataQueue, logic.wakeUp, timeout)) + } +} + +/** + * INTERNAL API + * InputStreamAdapter that interacts with InputStreamSinkStage + */ +private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapterMessage], + sendToStage: (AdapterToStageMessage) ⇒ Unit, + timeout: FiniteDuration) + extends InputStream { + + var isActive = true + var isStageAlive = true + val subscriberClosedException = new IOException("Reactive stream is terminated, no reads are possible") + var skipBytes = 0 + var detachedChunk: Option[ByteString] = None + + @scala.throws(classOf[IOException]) + private[this] def executeIfNotClosed[T](f: () ⇒ T): T = + if (isActive) f() + else throw subscriberClosedException + + @scala.throws(classOf[IOException]) + override def read(): Int = { + val a = Array[Byte](1) + if (read(a, 0, 1) != -1) a(0) + else -1 + } + + @scala.throws(classOf[IOException]) + override def read(a: Array[Byte]): Int = read(a, 0, a.length) + + @scala.throws(classOf[IOException]) + override def read(a: Array[Byte], begin: Int, length: Int): Int = { + executeIfNotClosed(() ⇒ + if (isStageAlive) { + detachedChunk match { + case None ⇒ + sharedBuffer.poll(timeout.toMillis, TimeUnit.MILLISECONDS) match { + case Data(data) ⇒ + detachedChunk = Some(data) + readBytes(a, begin, length) + case Finished ⇒ + isStageAlive = false + -1 + case Failed(ex) ⇒ + isStageAlive = false + throw new IOException(ex) + } + case Some(data) ⇒ + readBytes(a, begin, length) + } + } else -1) + } + + private[this] def readBytes(a: Array[Byte], begin: Int, length: Int): Int = { + val availableInChunk = detachedChunk.size - skipBytes + val readBytes = getData(a, begin, length, 0) + if (readBytes >= availableInChunk) sendToStage(ReadElementAcknowledgement) + readBytes + } + + @scala.throws(classOf[IOException]) + override def close(): Unit = { + executeIfNotClosed(() ⇒ { + // at this point Subscriber may be already terminated + if (isStageAlive) sendToStage(Close) + isActive = false + }) + } + + @tailrec + private[this] def getData(arr: Array[Byte], begin: Int, length: Int, + gotBytes: Int): Int = { + getDataChunk() match { + case Some(data) ⇒ + val size = data.size - skipBytes + if (size + gotBytes <= length) { + System.arraycopy(data.toArray, skipBytes, arr, begin, size) + skipBytes = 0 + detachedChunk = None + if (length - size == 0) + gotBytes + size + else + getData(arr, begin + size, length - size, gotBytes + size) + } else { + System.arraycopy(data.toArray, skipBytes, arr, begin, length) + skipBytes = length + gotBytes + length + } + case None ⇒ gotBytes + } + } + + private[this] def getDataChunk(): Option[ByteString] = { + detachedChunk match { + case None ⇒ + sharedBuffer.poll() match { + case Data(data) ⇒ + detachedChunk = Some(data) + detachedChunk + case _ ⇒ None + } + case Some(_) ⇒ detachedChunk + } + } +} + diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala new file mode 100644 index 0000000000..29f8e7dedb --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala @@ -0,0 +1,178 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl.io + +import java.io.{ IOException, OutputStream } +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue } + +import akka.stream.Attributes.InputBuffer +import akka.stream.impl.io.OutputStreamSourceStage._ +import akka.stream.stage._ +import akka.util.ByteString + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ Await, Future, Promise } +import scala.util.control.NonFatal +import scala.util.{ Failure, Success, Try } + +private[akka] object OutputStreamSourceStage { + sealed trait AdapterToStageMessage + case object Flush extends AdapterToStageMessage + case object Close extends AdapterToStageMessage + + sealed trait DownstreamStatus + case object Ok extends DownstreamStatus + case object Canceled extends DownstreamStatus + + sealed trait StageWithCallback { + def wakeUp(msg: AdapterToStageMessage): Future[Unit] + } +} + +private[akka] class OutputStreamSourceStage(timeout: FiniteDuration) extends SourceStage[ByteString, OutputStream]("OutputStreamSourceStage") { + val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + require(maxBuffer > 0, "Buffer size must be greater than 0") + + override def createLogicAndMaterializedValue: (GraphStageLogic, OutputStream) = { + val dataQueue = new LinkedBlockingQueue[ByteString](maxBuffer) + + var flush: Option[Promise[Unit]] = None + var close: Option[Promise[Unit]] = None + val downstreamStatus = new AtomicReference[DownstreamStatus](Ok) + + val logic = new GraphStageLogic(shape) with StageWithCallback { + private val downstreamCallback: AsyncCallback[Try[ByteString]] = + getAsyncCallback(onAsyncElem) + + private val upstreamCallback: AsyncCallback[(AdapterToStageMessage, Promise[Unit])] = + getAsyncCallback(onAsyncMessage) + + override def wakeUp(msg: AdapterToStageMessage): Future[Unit] = { + implicit val ex = interpreter.materializer.executionContext + val p = Promise[Unit]() + Future(upstreamCallback.invoke((msg, p))) + p.future + } + + private def onAsyncMessage(event: (AdapterToStageMessage, Promise[Unit])): Unit = + event._1 match { + case Flush ⇒ + flush = Some(event._2) + sendResponseIfNeed() + case Close ⇒ + close = Some(event._2) + if (dataQueue.isEmpty) { + downstreamStatus.set(Canceled) + completeStage() + unblockUpstream() + } else sendResponseIfNeed() + } + + private def onAsyncElem(event: Try[ByteString]): Unit = event match { + case Success(elem) ⇒ onPush(elem) + case Failure(ex) ⇒ failStage(ex) + } + + private def unblockUpstream(): Boolean = + flush match { + case Some(p) ⇒ + p.complete(Success(())) + flush = None + true + case None ⇒ close match { + case Some(p) ⇒ + p.complete(Success(())) + close = None + true + case None ⇒ false + } + } + + private def sendResponseIfNeed(): Unit = + if (downstreamStatus.get() == Canceled || dataQueue.isEmpty) unblockUpstream() + + private def onPush(data: ByteString): Unit = + if (downstreamStatus.get() == Ok) { + push(out, data) + sendResponseIfNeed() + } + + setHandler(out, new OutHandler { + override def onDownstreamFinish(): Unit = { + //assuming there can be no further in messages + downstreamStatus.set(Canceled) + dataQueue.clear() + completeStage() + } + override def onPull(): Unit = { + implicit val ex = interpreter.materializer.executionContext + Future(dataQueue.take()).onComplete(downstreamCallback.invoke) + } + }) + } + (logic, new OutputStreamAdapter(dataQueue, downstreamStatus, logic.wakeUp, timeout)) + } +} + +private[akka] class OutputStreamAdapter(dataQueue: BlockingQueue[ByteString], + downstreamStatus: AtomicReference[DownstreamStatus], + sendToStage: (AdapterToStageMessage) ⇒ Future[Unit], + timeout: FiniteDuration) + extends OutputStream { + + var isActive = true + var isPublisherAlive = true + val publisherClosedException = new IOException("Reactive stream is terminated, no writes are possible") + + private[this] def send(sendAction: () ⇒ Unit): Unit = { + if (isActive) { + if (isPublisherAlive) { + sendAction() + } else throw publisherClosedException + } else throw new IOException("OutputStream is closed") + } + + private[this] def sendData(data: ByteString): Unit = + send(() ⇒ { + dataQueue.put(data) + if (downstreamStatus.get() == Canceled) { + isPublisherAlive = false + throw publisherClosedException + } + }) + + private[this] def sendMessage(message: AdapterToStageMessage, handleCancelled: Boolean = true) = + send(() ⇒ + try { + Await.ready(sendToStage(message), timeout) + if (downstreamStatus.get() == Canceled && handleCancelled) { + //Publisher considered to be terminated at earliest convenience to minimize messages sending back and forth + isPublisherAlive = false + throw publisherClosedException + } + } catch { + case e: IOException ⇒ throw e + case NonFatal(e) ⇒ throw new IOException(e) + }) + + @scala.throws(classOf[IOException]) + override def write(b: Int): Unit = { + sendData(ByteString(b)) + } + + @scala.throws(classOf[IOException]) + override def write(b: Array[Byte], off: Int, len: Int): Unit = { + sendData(ByteString.fromArray(b, off, len)) + } + + @scala.throws(classOf[IOException]) + override def flush(): Unit = sendMessage(Flush) + + @scala.throws(classOf[IOException]) + override def close(): Unit = { + sendMessage(Close, handleCancelled = false) + isActive = false + } +} diff --git a/akka-stream/src/main/scala/akka/stream/io/InputStreamSink.scala b/akka-stream/src/main/scala/akka/stream/io/InputStreamSink.scala new file mode 100644 index 0000000000..d6dc0cb396 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/InputStreamSink.scala @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io + +import java.io.InputStream +import java.lang.{ Long ⇒ JLong } + +import akka.stream.impl.io.InputStreamSinkStage +import akka.stream.scaladsl.Sink +import akka.stream.{ Attributes, ActorAttributes, javadsl } +import akka.util.ByteString + +import scala.concurrent.duration.{ FiniteDuration, _ } +import scala.language.postfixOps + +/** + * Sink which allows to use [[java.io.InputStream]] to interact with reactive stream. + */ +object InputStreamSink { + + /** + * Creates a synchronous (Java 6 compatible) Sink + * + * It materializes an [[java.io.InputStream]] to interacting with reactive stream. + * + * This sink is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`, + * unless configured otherwise by using [[ActorAttributes]]. + */ + def apply(timeout: FiniteDuration = 5 seconds): Sink[ByteString, InputStream] = + Sink.fromGraph(new InputStreamSinkStage(timeout)) + .withAttributes(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher") and + Attributes.name("InputStreamSink")) + + /** + * Creates a synchronous (Java 6 compatible) Sink + * + * It materializes an [[java.io.InputStream]] to interacting with reactive stream. + * + * This sink is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`, + * unless configured otherwise by using [[akka.stream.ActorAttributes]]. + */ + def create(): javadsl.Sink[ByteString, InputStream] = + new javadsl.Sink(apply()) + + /** + * Creates a synchronous (Java 6 compatible) Sink + * + * It materializes an [[java.io.InputStream]] to interacting with reactive stream. + * + * This sink is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`, + * unless configured otherwise by using [[akka.stream.ActorAttributes]]. + */ + def create(timeout: FiniteDuration): javadsl.Sink[ByteString, InputStream] = + new javadsl.Sink(apply(timeout)) + +} diff --git a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala index bbe5f3f122..833862bd5a 100644 --- a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala +++ b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala @@ -25,7 +25,7 @@ object OutputStreamSink { * * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. * - * This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`, + * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, * unless configured otherwise by using [[ActorAttributes]]. */ def apply(output: () ⇒ OutputStream): Sink[ByteString, Future[Long]] = diff --git a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSource.scala b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSource.scala new file mode 100644 index 0000000000..b3f7959958 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSource.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io + +import java.io.OutputStream + +import akka.stream.Attributes.Name +import akka.stream._ +import akka.stream.impl.io.OutputStreamSourceStage +import akka.stream.scaladsl.{ FlowGraph, Source } +import akka.util.ByteString + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.implicitConversions + +/** + * Source which allows to use [[java.io.OutputStream]] to interact with reactive stream. + */ +object OutputStreamSource { + import scala.language.postfixOps + + /** + * Creates a synchronous (Java 6 compatible) Source. + * + * It materializes an [[java.io.OutputStream]] to interact with reactive stream. + * + * This source is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`, + * unless configured otherwise by using [[akka.stream.ActorAttributes]]. + */ + def apply(timeout: FiniteDuration = 5.seconds): Source[ByteString, OutputStream] = + Source.fromGraph(new OutputStreamSourceStage(timeout)) + .withAttributes(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher") and + Attributes.name("OutputStreamSource")) + + /** + * Creates a synchronous (Java 6 compatible) Source. + * + * It materializes an [[java.io.OutputStream]] to interact with reactive stream. + * + * This source is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`, + * unless configured otherwise by using [[akka.stream.ActorAttributes]]. + */ + def create(): javadsl.Source[ByteString, OutputStream] = + new javadsl.Source(apply()) + + /** + * Creates a synchronous (Java 6 compatible) Source. + * + * It materializes an [[java.io.OutputStream]] to interacting with reactive stream. + * + * This source is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`, + * unless configured otherwise by using [[akka.stream.ActorAttributes]]. + */ + def create(timeout: FiniteDuration): javadsl.Source[ByteString, OutputStream] = { + new javadsl.Source(apply(timeout)) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala index bafcdf6f5f..f5fabdbec6 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala @@ -24,7 +24,7 @@ object SynchronousFileSink { * * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. * - * This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`, + * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, * unless configured otherwise by using [[ActorAttributes]]. */ def apply(f: File, append: Boolean = false): Sink[ByteString, Future[Long]] = @@ -38,7 +38,7 @@ object SynchronousFileSink { * * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. * - * This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`, + * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, * unless configured otherwise by using [[ActorAttributes]]. */ def create(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] = @@ -51,7 +51,7 @@ object SynchronousFileSink { * * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. * - * This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`, + * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, * unless configured otherwise by using [[ActorAttributes]]. */ def appendTo(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] = diff --git a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala index 31f55e2f5f..f038595952 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala @@ -19,7 +19,7 @@ object SynchronousFileSource { * Emitted elements are `chunkSize` sized [[ByteString]] elements. * * This source is backed by an Actor which will use the dedicated thread-pool base dispatcher. - * You can configure the default dispatcher for this Source by changing the `akka.stream.file-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. @@ -32,7 +32,7 @@ object SynchronousFileSource { * Emitted elements are [[ByteString]] elements, chunked by default by [[DefaultChunkSize]] bytes. * * This source is backed by an Actor which will use the dedicated thread-pool base dispatcher. - * You can configure the default dispatcher for this Source by changing the `akka.stream.file-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. @@ -44,7 +44,7 @@ object SynchronousFileSource { * Emitted elements are `chunkSize` sized [[ByteString]] elements. * * This source is backed by an Actor which will use the dedicated thread-pool base dispatcher. - * You can configure the default dispatcher for this Source by changing the `akka.stream.file-io-dispatcher` or + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index fdc3bc5e7c..02428b22fb 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -45,6 +45,24 @@ abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, def createLogic: GraphStageLogic } +/** + * A SourceStage represents a reusable graph stream processing stage. A SourceStage consists of a [[akka.stream.Shape]] which describes + * its output port. + */ +abstract class SourceStage[Out, M](name: String) extends GraphStageWithMaterializedValue[SourceShape[Out], M] { + val out: Outlet[Out] = Outlet[Out](name + ".out") + override val shape: SourceShape[Out] = SourceShape(out) +} + +/** + * A SinkStage represents a reusable graph stream processing stage. A SinkStage consists of a [[akka.stream.Shape]] which describes + * its input port. + */ +abstract class SinkStage[In, M](name: String) extends GraphStageWithMaterializedValue[SinkShape[In], M] { + val in: Inlet[In] = Inlet[In](name + ".in") + override val shape: SinkShape[In] = SinkShape(in) +} + private object TimerMessages { final case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) extends DeadLetterSuppression final case class Timer(id: Int, task: Cancellable)