From cebd9bf1ae7ed5b73ddc6f9abb7ea51bbed6d755 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Thu, 16 Apr 2015 02:24:01 +0200 Subject: [PATCH] +str #15588,#17229 Java 6 Synchronous File Sink / Source These are synchronous implementations, because we need to be Java 6 compatible while developing on 2.3.x. However asynchronous implementations using AsynchronousFileChannel will come soon for JDK7 users. + ActorPublisher/Subscriber now manage stopping of the actor + added documentation on configuring dispatcher for File IO + properly handle if source file does not exist + file sink / source come with default io dispatcher > verified no actors are leaking > exceptions are caught and onErrored properly + moved files to akka.stream.io + Added OutputStreamSink and InputStreamSource --- akka-docs-dev/rst/java/stream-io.rst | 44 +++- .../docs/stream/io/StreamFileDocSpec.scala | 57 +++++ .../stream/{ => io}/StreamTcpDocSpec.scala | 12 +- akka-docs-dev/rst/scala/stream-io.rst | 44 +++- akka-http/src/main/resources/reference.conf | 2 +- .../stream/tck/ActorSystemLifecycle.scala | 2 +- .../tck/SynchronousFilePublisherTest.scala | 48 +++++ .../akka/stream/testkit/StreamTestKit.scala | 24 ++- .../stream/actor/ActorPublisherSpec.scala | 45 ++-- .../stream/actor/ActorSubscriberSpec.scala | 37 +++- .../stream/io/InputStreamSourceSpec.scala | 42 ++++ .../akka/stream/io/OutputStreamSinkSpec.scala | 66 ++++++ .../stream/io/SynchronousFileSinkSpec.scala | 135 ++++++++++++ .../stream/io/SynchronousFileSourceSpec.scala | 195 ++++++++++++++++++ akka-stream/src/main/resources/reference.conf | 16 ++ .../akka/stream/ActorFlowMaterializer.scala | 14 +- .../akka/stream/actor/ActorPublisher.scala | 25 ++- .../akka/stream/actor/ActorSubscriber.scala | 22 +- .../impl/ActorFlowMaterializerImpl.scala | 12 +- .../impl/{Sources.scala => Modules.scala} | 8 +- .../main/scala/akka/stream/impl/Sinks.scala | 4 + .../main/scala/akka/stream/impl/Stages.scala | 6 +- .../akka/stream/io/InputStreamSource.scala | 52 +++++ .../akka/stream/io/OutputStreamSink.scala | 43 ++++ .../akka/stream/io/SynchronousFileSink.scala | 60 ++++++ .../stream/io/SynchronousFileSource.scala | 59 ++++++ .../akka/stream/io/impl/IOSettings.scala | 14 ++ .../scala/akka/stream/io/impl/IOSinks.scala | 69 +++++++ .../scala/akka/stream/io/impl/IOSources.scala | 74 +++++++ .../stream/io/impl/InputStreamPublisher.scala | 121 +++++++++++ .../io/impl/OutputStreamSubscriber.scala | 60 ++++++ .../io/impl/SynchronousFilePublisher.scala | 121 +++++++++++ .../io/impl/SynchronousFileSubscriber.scala | 81 ++++++++ .../main/scala/akka/stream/javadsl/Sink.scala | 15 +- .../scala/akka/stream/javadsl/Source.scala | 5 +- .../scala/akka/stream/scaladsl/Sink.scala | 19 +- .../scala/akka/stream/scaladsl/Source.scala | 14 +- 37 files changed, 1581 insertions(+), 86 deletions(-) create mode 100644 akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala rename akka-docs-dev/rst/scala/code/docs/stream/{ => io}/StreamTcpDocSpec.scala (94%) create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala rename akka-stream/src/main/scala/akka/stream/impl/{Sources.scala => Modules.scala} (98%) create mode 100644 akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala create mode 100644 akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala create mode 100644 akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala create mode 100644 akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala create mode 100644 akka-stream/src/main/scala/akka/stream/io/impl/IOSettings.scala create mode 100644 akka-stream/src/main/scala/akka/stream/io/impl/IOSinks.scala create mode 100644 akka-stream/src/main/scala/akka/stream/io/impl/IOSources.scala create mode 100644 akka-stream/src/main/scala/akka/stream/io/impl/InputStreamPublisher.scala create mode 100644 akka-stream/src/main/scala/akka/stream/io/impl/OutputStreamSubscriber.scala create mode 100644 akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFilePublisher.scala create mode 100644 akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFileSubscriber.scala diff --git a/akka-docs-dev/rst/java/stream-io.rst b/akka-docs-dev/rst/java/stream-io.rst index 9b65b9b21d..f58af3c5a5 100644 --- a/akka-docs-dev/rst/java/stream-io.rst +++ b/akka-docs-dev/rst/java/stream-io.rst @@ -4,19 +4,22 @@ Working with streaming IO ######################### -Akka Streams provides a way of handling TCP connections with Streams. +Akka Streams provides a way of handling File IO and TCP connections with Streams. While the general approach is very similar to the `Actor based TCP handling`_ using Akka IO, by using Akka Streams you are freed of having to manually react to back-pressure signals, as the library does it transparently for you. .. _Actor based TCP handling: http://doc.akka.io/docs/akka/current/java/io-tcp.html +Streaming TCP +============= + Accepting connections: Echo Server -================================== +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ In order to implement a simple EchoServer we ``bind`` to a given address, which returns a ``Source[IncomingConnection]``, which will emit an :class:`IncomingConnection` element for each new connection that the Server should handle: -.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTcpDocTest.java#echo-server-simple-bind +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/io/StreamTcpDocTest.java#echo-server-simple-bind Next, we simply handle *each* incoming connection using a :class:`Flow` which will be used as the processing stage to handle and emit ByteStrings from and to the TCP Socket. Since one :class:`ByteString` does not have to necessarily @@ -24,7 +27,7 @@ correspond to exactly one line of text (the client might be sending the line in recipe from the :ref:`cookbook-parse-lines-java` Akka Streams Cookbook recipe to chunk the inputs up into actual lines of text. In this example we simply add exclamation marks to each incoming text message and push it through the flow: -.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTcpDocTest.java#echo-server-simple-handle +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/io/StreamTcpDocTest.java#echo-server-simple-handle Notice that while most building blocks in Akka Streams are reusable and freely shareable, this is *not* the case for the incoming connection Flow, since it directly corresponds to an existing, already accepted connection its handling can @@ -42,13 +45,13 @@ We can then test the TCP server by sending data to the TCP Socket using ``netcat Hello World!!! Connecting: REPL Client -======================= +^^^^^^^^^^^^^^^^^^^^^^^ In this example we implement a rather naive Read Evaluate Print Loop client over TCP. Let's say we know a server has exposed a simple command line interface over TCP, and would like to interact with it using Akka Streams over TCP. To open an outgoing connection socket we use the ``outgoingConnection`` method: -.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTcpDocTest.java#repl-client +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/io/StreamTcpDocTest.java#repl-client The ``repl`` flow we use to handle the server interaction first prints the servers response, then awaits on input from the command line (this blocking call is used here just for the sake of simplicity) and converts it to a @@ -61,7 +64,7 @@ a separate mapAsync step and have a way to let the server write more data than o these improvements however are left as exercise for the reader. Avoiding deadlocks and liveness issues in back-pressured cycles -=============================================================== +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ When writing such end-to-end back-pressured systems you may sometimes end up in a situation of a loop, in which *either side is waiting for the other one to start the conversation*. One does not need to look far to find examples of such back-pressure loops. In the two examples shown previously, we always assumed that the side we @@ -80,7 +83,7 @@ Thankfully in most situations finding the right spot to start the conversation i to the protocol we are trying to implement using Streams. In chat-like applications, which our examples resemble, it makes sense to make the Server initiate the conversation by emitting a "hello" message: -.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTcpDocTest.java#welcome-banner-chat-server +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/io/StreamTcpDocTest.java#welcome-banner-chat-server The way we constructed a :class:`Flow` using a :class:`PartialFlowGraph` is explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-java`, however the basic concepts is rather simple– @@ -93,3 +96,28 @@ logic in Flows and attaching those to :class:`StreamIO` in order to implement yo In this example both client and server may need to close the stream based on a parsed command command - ``BYE`` in the case of the server, and ``q`` in the case of the client. This is implemented by using a custom :class:`PushStage` (see :ref:`stream-using-push-pull-stage-java`) which completes the stream once it encounters such command. + +Streaming File IO +================= + +Akka Streams provide simple Sources and Sinks that can work with :class:`ByteString` instances to perform IO operations +on files. + +.. note:: + Since the current version of Akka (``2.3.x``) needs to support JDK6, the currently provided File IO implementations + are not able to utilise Asynchronous File IO operations, as these were introduced in JDK7 (and newer). + Once Akka is free to require JDK8 (from ``2.4.x``) these implementations will be updated to make use of the + new NIO APIs (i.e. :class:`AsynchronousFileChannel`). + +Streaming data from a file is as easy as defining a `SynchronousFileSource` given a target file, and an optional +``chunkSize`` which determines the buffer size determined as one "element" in such stream: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/io/StreamFileDocTest.java#file-source + +Please note that these processing stages are backed by Actors and by default are configured to run on a pre-configured +threadpool-backed dispatcher dedicated for File IO. This is very important as it isolates the blocking file IO operations from the rest +of the ActorSystem allowing each dispatcher to be utilised in the most efficient way. If you want to configure a custom +dispatcher for file IO operations globally, you can do so by changing the ``akka.strea.file-io-dispatcher``, +or for a specific stage by spefifying a custom Dispatcher in code, like this: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/io/StreamFileDocTest.java#custom-dispatcher-code diff --git a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala new file mode 100644 index 0000000000..40ab5254df --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2015 Typesafe Inc. + */ +package docs.stream.io + +import java.io.File + +import akka.stream._ +import akka.stream.io.SynchronousFileSource +import akka.stream.io.SynchronousFileSink +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import akka.util.ByteString + +class StreamFileDocSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxConfig) { + + implicit val ec = system.dispatcher + implicit val mat = ActorFlowMaterializer() + + // silence sysout + def println(s: String) = () + + val file = File.createTempFile(getClass.getName, ".tmp") + + override def afterTermination() = file.delete() + + { + //#file-source + import akka.stream.io._ + //#file-source + } + + { + //#file-source + val file = new File("example.csv") + //#file-source + } + + "read data from a file" in { + //#file-source + def handle(b: ByteString): Unit //#file-source + = () + + //#file-source + + SynchronousFileSource(file) + .runForeach((chunk: ByteString) ⇒ handle(chunk)) + //#file-source + } + + "configure dispatcher in code" in { + //#custom-dispatcher-code + SynchronousFileSink(file) + .withAttributes(ActorOperationAttributes.dispatcher("custom-file-io-dispatcher")) + //#custom-dispatcher-code + } +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala similarity index 94% rename from akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala rename to akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala index 8f6dba70b4..a035af288c 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala @@ -1,20 +1,22 @@ /** * Copyright (C) 2014 Typesafe Inc. */ -package docs.stream +package docs.stream.io import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference -import akka.actor.ActorSystem + import akka.stream._ +import akka.stream.scaladsl.StreamTcp._ import akka.stream.scaladsl._ -import akka.stream.stage.{ PushStage, SyncDirective, Context } +import akka.stream.stage.Context +import akka.stream.stage.PushStage +import akka.stream.stage.SyncDirective import akka.stream.testkit.AkkaSpec import akka.testkit.TestProbe import akka.util.ByteString -import cookbook.RecipeParseLines +import docs.stream.cookbook.RecipeParseLines import docs.utils.TestUtils -import StreamTcp._ import scala.concurrent.Future diff --git a/akka-docs-dev/rst/scala/stream-io.rst b/akka-docs-dev/rst/scala/stream-io.rst index 6af31277df..1df2916927 100644 --- a/akka-docs-dev/rst/scala/stream-io.rst +++ b/akka-docs-dev/rst/scala/stream-io.rst @@ -4,19 +4,22 @@ Working with streaming IO ######################### -Akka Streams provides a way of handling TCP connections with Streams. +Akka Streams provides a way of handling File IO and TCP connections with Streams. While the general approach is very similar to the `Actor based TCP handling`_ using Akka IO, by using Akka Streams you are freed of having to manually react to back-pressure signals, as the library does it transparently for you. .. _Actor based TCP handling: http://doc.akka.io/docs/akka/current/scala/io-tcp.html +Streaming TCP +============= + Accepting connections: Echo Server -================================== +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ In order to implement a simple EchoServer we ``bind`` to a given address, which returns a ``Source[IncomingConnection]``, which will emit an :class:`IncomingConnection` element for each new connection that the Server should handle: -.. includecode:: code/docs/stream/StreamTcpDocSpec.scala#echo-server-simple-bind +.. includecode:: code/docs/stream/io/StreamTcpDocSpec.scala#echo-server-simple-bind Next, we simply handle *each* incoming connection using a :class:`Flow` which will be used as the processing stage to handle and emit ByteStrings from and to the TCP Socket. Since one :class:`ByteString` does not have to necessarily @@ -24,7 +27,7 @@ correspond to exactly one line of text (the client might be sending the line in recipe from the :ref:`cookbook-parse-lines-scala` Akka Streams Cookbook recipe to chunk the inputs up into actual lines of text. In this example we simply add exclamation marks to each incoming text message and push it through the flow: -.. includecode:: code/docs/stream/StreamTcpDocSpec.scala#echo-server-simple-handle +.. includecode:: code/docs/stream/io/StreamTcpDocSpec.scala#echo-server-simple-handle Notice that while most building blocks in Akka Streams are reusable and freely shareable, this is *not* the case for the incoming connection Flow, since it directly corresponds to an existing, already accepted connection its handling can @@ -42,13 +45,13 @@ We can then test the TCP server by sending data to the TCP Socket using ``netcat Hello World!!! Connecting: REPL Client -======================= +^^^^^^^^^^^^^^^^^^^^^^^ In this example we implement a rather naive Read Evaluate Print Loop client over TCP. Let's say we know a server has exposed a simple command line interface over TCP, and would like to interact with it using Akka Streams over TCP. To open an outgoing connection socket we use the ``outgoingConnection`` method: -.. includecode:: code/docs/stream/StreamTcpDocSpec.scala#repl-client +.. includecode:: code/docs/stream/io/StreamTcpDocSpec.scala#repl-client The ``repl`` flow we use to handle the server interaction first prints the servers response, then awaits on input from the command line (this blocking call is used here just for the sake of simplicity) and converts it to a @@ -61,7 +64,7 @@ a separate mapAsync step and have a way to let the server write more data than o these improvements however are left as exercise for the reader. Avoiding deadlocks and liveness issues in back-pressured cycles -=============================================================== +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ When writing such end-to-end back-pressured systems you may sometimes end up in a situation of a loop, in which *either side is waiting for the other one to start the conversation*. One does not need to look far to find examples of such back-pressure loops. In the two examples shown previously, we always assumed that the side we @@ -80,7 +83,7 @@ Thankfully in most situations finding the right spot to start the conversation i to the protocol we are trying to implement using Streams. In chat-like applications, which our examples resemble, it makes sense to make the Server initiate the conversation by emitting a "hello" message: -.. includecode:: code/docs/stream/StreamTcpDocSpec.scala#welcome-banner-chat-server +.. includecode:: code/docs/stream/io/StreamTcpDocSpec.scala#welcome-banner-chat-server The way we constructed a :class:`Flow` using a :class:`PartialFlowGraph` is explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`, however the basic concepts is rather simple– @@ -93,3 +96,28 @@ logic in Flows and attaching those to :class:`StreamIO` in order to implement yo In this example both client and server may need to close the stream based on a parsed command command - ``BYE`` in the case of the server, and ``q`` in the case of the client. This is implemented by using a custom :class:`PushStage` (see :ref:`stream-using-push-pull-stage-scala`) which completes the stream once it encounters such command. + +Streaming File IO +================= + +Akka Streams provide simple Sources and Sinks that can work with :class:`ByteString` instances to perform IO operations +on files. + +.. note:: + Since the current version of Akka (``2.3.x``) needs to support JDK6, the currently provided File IO implementations + are not able to utilise Asynchronous File IO operations, as these were introduced in JDK7 (and newer). + Once Akka is free to require JDK8 (from ``2.4.x``) these implementations will be updated to make use of the + new NIO APIs (i.e. :class:`AsynchronousFileChannel`). + +Streaming data from a file is as easy as defining a `SynchronousFileSource` given a target file, and an optional +``chunkSize`` which determines the buffer size determined as one "element" in such stream: + +.. includecode:: code/docs/stream/io/StreamFileDocSpec.scala#file-source + +Please note that these processing stages are backed by Actors and by default are configured to run on a pre-configured +threadpool-backed dispatcher dedicated for File IO. This is very important as it isolates the blocking file IO operations from the rest +of the ActorSystem allowing each dispatcher to be utilised in the most efficient way. If you want to configure a custom +dispatcher for file IO operations globally, you can do so by changing the ``akka.strea.file-io-dispatcher``, +or for a specific stage by spefifying a custom Dispatcher in code, like this: + +.. includecode:: code/docs/stream/io/StreamFileDocSpec.scala#custom-dispatcher-code diff --git a/akka-http/src/main/resources/reference.conf b/akka-http/src/main/resources/reference.conf index bde9f1484d..eba81d60a1 100644 --- a/akka-http/src/main/resources/reference.conf +++ b/akka-http/src/main/resources/reference.conf @@ -39,5 +39,5 @@ akka.http.routing { # Fully qualified config path which holds the dispatcher configuration # to be used by FlowMaterialiser when creating Actors for IO operations. - file-io-dispatcher = ${akka.io.tcp.file-io-dispatcher} + file-io-dispatcher = ${akka.stream.file-io-dispatcher} } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/ActorSystemLifecycle.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/ActorSystemLifecycle.scala index 90984a797a..9bbf0a805b 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/ActorSystemLifecycle.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/ActorSystemLifecycle.scala @@ -16,7 +16,7 @@ import org.testng.annotations.BeforeClass trait ActorSystemLifecycle { - private var _system: ActorSystem = _ + protected var _system: ActorSystem = _ final def system: ActorSystem = _system diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala new file mode 100644 index 0000000000..682f981c42 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import java.io.{ File, FileWriter } + +import akka.actor.ActorSystem +import akka.event.Logging +import akka.stream.io.SynchronousFileSource +import akka.stream.scaladsl.Sink +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.testkit.{ EventFilter, TestEvent } +import akka.util.ByteString +import org.reactivestreams.Publisher +import org.testng.annotations.{ AfterClass, BeforeClass } + +class SynchronousFilePublisherTest extends AkkaPublisherVerification[ByteString] { + + val ChunkSize = 256 + val Elements = 1000 + + @BeforeClass + override def createActorSystem(): Unit = { + _system = ActorSystem(Logging.simpleName(getClass), StreamTestKit.UnboundedMailboxConfig.withFallback(AkkaSpec.testConf)) + _system.eventStream.publish(TestEvent.Mute(EventFilter[RuntimeException]("Test exception"))) + } + + val file = { + val f = File.createTempFile("file-source-tck", ".tmp") + val chunk = "x" * ChunkSize + val fw = new FileWriter(f) + for (i ← 1 to Elements) fw.append(chunk) + fw.close() + f + } + + def createPublisher(elements: Long): Publisher[ByteString] = + SynchronousFileSource(file, chunkSize = 512) + .take(elements) + .runWith(Sink.publisher) + + @AfterClass + def after = file.delete() + + override def maxElementsFromPublisher(): Long = Elements +} + diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala index aec520baad..b3b01028b6 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -3,9 +3,12 @@ */ package akka.stream.testkit +import akka.stream.FlowMaterializer +import com.typesafe.config.ConfigFactory + import scala.language.existentials import akka.actor.ActorSystem -import akka.stream.impl.{ EmptyPublisher, ErrorPublisher } +import akka.stream.impl.{ StreamSupervisor, ActorFlowMaterializerImpl, EmptyPublisher, ErrorPublisher } import akka.testkit.TestProbe import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.concurrent.duration.FiniteDuration @@ -14,6 +17,9 @@ import scala.util.control.NoStackTrace object StreamTestKit { + /** Sets the default-mailbox to the usual [[akka.dispatch.UnboundedMailbox]] instead of [[StreamTestDefaultMailbox]]. */ + val UnboundedMailboxConfig = ConfigFactory.parseString("""akka.actor.default-mailbox.mailbox-type = "akka.dispatch.UnboundedMailbox"""") + /** * Subscribes the subscriber and completes after the first request. */ @@ -174,4 +180,20 @@ object StreamTestKit { } case class TE(message: String) extends RuntimeException(message) with NoStackTrace + + def checkThatAllStagesAreStopped[T](block: ⇒ T)(implicit materializer: FlowMaterializer): T = + materializer match { + case impl: ActorFlowMaterializerImpl ⇒ + impl.supervisor ! StreamSupervisor.StopChildren + val result = block + val probe = TestProbe()(impl.system) + probe.awaitAssert { + impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref) + val children = probe.expectMsgType[StreamSupervisor.Children].children + assert(children.isEmpty, + s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]") + } + result + case _ ⇒ block + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index f91606181b..7defef30f5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -3,23 +3,16 @@ */ package akka.stream.actor -import akka.actor.ActorRef -import akka.actor.PoisonPill -import akka.actor.Props +import akka.actor.{ ActorRef, PoisonPill, Props } +import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings, ActorOperationAttributes } import akka.stream.scaladsl._ -import akka.stream.ActorFlowMaterializer -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.StreamTestKit -import akka.testkit.EventFilter -import akka.testkit.ImplicitSender +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } import akka.testkit.TestEvent.Mute -import akka.testkit.TestProbe +import akka.testkit.{ EventFilter, ImplicitSender, TestProbe } + import scala.annotation.tailrec import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import akka.stream.impl.SubscriberSink -import akka.stream.ActorFlowMaterializerSettings -import akka.stream.ActorOperationAttributes object ActorPublisherSpec { @@ -176,6 +169,18 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic s.expectError.getMessage should be("wrong") } + "terminate after signalling error" in { + val probe = TestProbe() + val ref = system.actorOf(testPublisherProps(probe.ref)) + val s = StreamTestKit.SubscriberProbe[String]() + ActorPublisher[String](ref).subscribe(s) + s.expectSubscription + probe.watch(ref) + ref ! Err("wrong") + s.expectError.getMessage should be("wrong") + probe.expectTerminated(ref, 200.millis) + } + "signal error before subscribe" in { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) @@ -232,6 +237,22 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic s.expectComplete } + "terminate after signalling onComplete" in { + val probe = TestProbe() + val ref = system.actorOf(testPublisherProps(probe.ref)) + val s = StreamTestKit.SubscriberProbe[String]() + ActorPublisher[String](ref).subscribe(s) + val sub = s.expectSubscription + sub.request(3) + probe.expectMsg(TotalDemand(3)) + probe.watch(ref) + ref ! Produce("elem-1") + ref ! Complete + s.expectNext("elem-1") + s.expectComplete + probe.expectTerminated(ref, 200.millis) + } + "signal immediate onComplete" in { val probe = TestProbe() val ref = system.actorOf(testPublisherProps(probe.ref)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala index faf5b3d498..2ae93901fc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala @@ -6,10 +6,10 @@ package akka.stream.actor import akka.actor.{ Actor, ActorRef, Props } import akka.routing.{ ActorRefRoutee, RoundRobinRoutingLogic, Router } import akka.stream.ActorFlowMaterializer -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{ Sink, Source } import akka.stream.testkit.AkkaSpec import akka.testkit.ImplicitSender +import org.reactivestreams.Subscription import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -31,6 +31,18 @@ object ActorSubscriberSpec { case "ready" ⇒ request(elements = 2) case "boom" ⇒ throw new RuntimeException("boom") with NoStackTrace case "requestAndCancel" ⇒ { request(1); cancel() } + case "cancel" ⇒ cancel() + } + } + + def immediatelyCancelledSubscriberProps(probe: ActorRef): Props = + Props(new ImmediatelyCancelledSubscriber(probe)).withDispatcher("akka.test.stream-dispatcher") + + class ImmediatelyCancelledSubscriber(probe: ActorRef) extends ManualSubscriber(probe) { + override val requestStrategy = ZeroRequestStrategy + override def preStart() = { + cancel() + super.preStart() } } @@ -146,6 +158,27 @@ class ActorSubscriberSpec extends AkkaSpec with ImplicitSender { expectNoMsg(200.millis) } + "terminate after cancel" in { + val ref = Source(1 to 5).runWith(Sink.actorSubscriber(manualSubscriberProps(testActor))) + watch(ref) + ref ! "requestAndCancel" + expectTerminated(ref, 200.millis) + } + + "cancel incoming subscription when cancel() was called before it arrived" in { + val ref = system.actorOf(immediatelyCancelledSubscriberProps(testActor)) + val sub = ActorSubscriber(ref) + watch(ref) + expectNoMsg(200.millis) + + sub.onSubscribe(new Subscription { + override def cancel(): Unit = testActor ! "cancel" + override def request(n: Long): Unit = () + }) + expectMsg("cancel") + expectTerminated(ref, 200.millis) + } + "work with OneByOneRequestStrategy" in { Source(1 to 17).runWith(Sink.actorSubscriber(requestStrategySubscriberProps(testActor, OneByOneRequestStrategy))) for (n ← 1 to 17) expectMsg(OnNext(n)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala new file mode 100644 index 0000000000..5a52051b64 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io + +import java.io.InputStream + +import akka.stream.scaladsl.Sink +import akka.stream.testkit.StreamTestKit._ +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings } +import akka.util.ByteString +import org.scalatest.concurrent.ScalaFutures + +class InputStreamSourceSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxConfig) with ScalaFutures { + + val settings = ActorFlowMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") + implicit val materializer = ActorFlowMaterializer(settings) + + "InputStreamSource" must { + "read bytes from InputStream" in checkThatAllStagesAreStopped { + val f = InputStreamSource(() ⇒ new InputStream { + @volatile var buf = List("a", "b", "c").map(_.charAt(0).toInt) + override def read(): Int = { + buf match { + case head :: tail ⇒ + buf = tail + head + case Nil ⇒ + -1 + } + + } + }) + .runWith(Sink.head) + + f.futureValue should ===(ByteString("abc")) + } + } + +} + diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala new file mode 100644 index 0000000000..c6549b83c3 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io + +import java.io.OutputStream + +import akka.stream.scaladsl.Source +import akka.stream.testkit.StreamTestKit._ +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings } +import akka.testkit.TestProbe +import akka.util.ByteString + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class OutputStreamSinkSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxConfig) { + + val settings = ActorFlowMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") + implicit val materializer = ActorFlowMaterializer(settings) + + "OutputStreamSink" must { + "write bytes to void OutputStream" in checkThatAllStagesAreStopped { + val p = TestProbe() + val datas = List(ByteString("a"), ByteString("c"), ByteString("c")) + + val completion = Source(datas) + .runWith(OutputStreamSink(() ⇒ new OutputStream { + override def write(i: Int): Unit = () + override def write(bytes: Array[Byte]): Unit = p.ref ! ByteString(bytes).utf8String + })) + + p.expectMsg(datas(0).utf8String) + p.expectMsg(datas(1).utf8String) + p.expectMsg(datas(2).utf8String) + Await.ready(completion, 3.seconds) + } + + "close underlying stream when error received" in checkThatAllStagesAreStopped { + val p = TestProbe() + Source.failed(new TE("Boom!")) + .runWith(OutputStreamSink(() ⇒ new OutputStream { + override def write(i: Int): Unit = () + override def close() = p.ref ! "closed" + })) + + p.expectMsg("closed") + } + + "close underlying stream when completion received" in checkThatAllStagesAreStopped { + val p = TestProbe() + Source.empty + .runWith(OutputStreamSink(() ⇒ new OutputStream { + override def write(i: Int): Unit = () + override def write(bytes: Array[Byte]): Unit = p.ref ! ByteString(bytes).utf8String + override def close() = p.ref ! "closed" + })) + + p.expectMsg("closed") + } + + } + +} + diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala new file mode 100644 index 0000000000..27900b0252 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io + +import java.io.File + +import akka.actor.{ ActorCell, ActorSystem, RepointableActorRef } +import akka.stream.scaladsl.Source +import akka.stream.testkit.StreamTestKit._ +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings, ActorOperationAttributes } +import akka.util.{ ByteString, Timeout } + +import scala.collection.mutable.ListBuffer +import scala.concurrent.Await +import scala.concurrent.duration._ + +class SynchronousFileSinkSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxConfig) { + + val settings = ActorFlowMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") + implicit val materializer = ActorFlowMaterializer(settings) + + val TestLines = { + val b = ListBuffer[String]() + b.append("a" * 1000 + "\n") + b.append("b" * 1000 + "\n") + b.append("c" * 1000 + "\n") + b.append("d" * 1000 + "\n") + b.append("e" * 1000 + "\n") + b.append("f" * 1000 + "\n") + b.toList + } + + val TestByteStrings = TestLines.map(ByteString(_)) + + "SynchronousFile Sink" must { + "write lines to a file" in checkThatAllStagesAreStopped { + targetFile { f ⇒ + val completion = Source(TestByteStrings) + .runWith(SynchronousFileSink(f)) + + val size = Await.result(completion, 3.seconds) + size should equal(6006) + checkFileContents(f, TestLines.mkString("")) + } + } + + "by default write into existing file" in checkThatAllStagesAreStopped { + targetFile { f ⇒ + def write(lines: List[String]) = + Source(lines) + .map(ByteString(_)) + .runWith(SynchronousFileSink(f)) + + val completion1 = write(TestLines) + Await.result(completion1, 3.seconds) + + val lastWrite = List("x" * 100) + val completion2 = write(lastWrite) + val written2 = Await.result(completion2, 3.seconds) + + written2 should ===(lastWrite.flatten.length) + checkFileContents(f, lastWrite.mkString("") + TestLines.mkString("").drop(100)) + } + } + + "allow appending to file" in checkThatAllStagesAreStopped { + targetFile { f ⇒ + def write(lines: List[String] = TestLines) = + Source(lines) + .map(ByteString(_)) + .runWith(SynchronousFileSink(f, append = true)) + + val completion1 = write() + val written1 = Await.result(completion1, 3.seconds) + + val lastWrite = List("x" * 100) + val completion2 = write(lastWrite) + val written2 = Await.result(completion2, 3.seconds) + + f.length() should ===(written1 + written2) + checkFileContents(f, TestLines.mkString("") + lastWrite.mkString("") + "\n") + } + } + + "use dedicated file-io-dispatcher by default" in checkThatAllStagesAreStopped { + targetFile { f ⇒ + val sys = ActorSystem("dispatcher-testing", StreamTestKit.UnboundedMailboxConfig) + val mat = ActorFlowMaterializer()(sys) + implicit val timeout = Timeout(3.seconds) + + try { + Source(() ⇒ Iterator.continually(TestByteStrings.head)).runWith(SynchronousFileSink(f))(mat) + + val ref = Await.result(sys.actorSelection("/user/$a/flow-1-2*").resolveOne(), timeout.duration) + ref.asInstanceOf[RepointableActorRef].underlying.asInstanceOf[ActorCell].dispatcher.id should ===("akka.stream.default-file-io-dispatcher") + } finally shutdown(sys) + } + } + + "allow overriding the dispatcher using OperationAttributes" in checkThatAllStagesAreStopped { + targetFile { f ⇒ + val sys = ActorSystem("dispatcher-testing", StreamTestKit.UnboundedMailboxConfig) + val mat = ActorFlowMaterializer()(sys) + implicit val timeout = Timeout(3.seconds) + + try { + Source(() ⇒ Iterator.continually(TestByteStrings.head)) + .to(SynchronousFileSink(f)) + .withAttributes(ActorOperationAttributes.dispatcher("akka.actor.default-dispatcher")) + .run()(mat) + + val ref = Await.result(sys.actorSelection("/user/$a/flow-1-2*").resolveOne(), timeout.duration) + ref.asInstanceOf[RepointableActorRef].underlying.asInstanceOf[ActorCell].dispatcher.id should ===("akka.actor.default-dispatcher") + } finally shutdown(sys) + } + } + + } + + private def targetFile(block: File ⇒ Unit) { + val targetFile = File.createTempFile("synchronous-file-sink", ".tmp") + try block(targetFile) finally targetFile.delete() + } + + def checkFileContents(f: File, contents: String): Unit = { + val s = scala.io.Source.fromFile(f) + val out = s.getLines().mkString("\n") + "\n" + s.close() + out should ===(contents) + } + +} + diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala new file mode 100644 index 0000000000..521b6e56d8 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala @@ -0,0 +1,195 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io + +import java.io.{ File, FileWriter } +import java.util.Random + +import akka.actor.{ ActorCell, RepointableActorRef, ActorSystem } +import akka.stream.io.SynchronousFileSourceSpec.Settings +import akka.stream.scaladsl.Sink +import akka.stream.testkit.StreamTestKit._ +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.{ ActorOperationAttributes, ActorFlowMaterializer, ActorFlowMaterializerSettings, OperationAttributes } +import akka.util.{ Timeout, ByteString } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +object SynchronousFileSourceSpec { + final case class Settings(chunkSize: Int, readAhead: Int) +} + +class SynchronousFileSourceSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxConfig) { + + val settings = ActorFlowMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") + implicit val materializer = ActorFlowMaterializer(settings) + + val TestText = { + ("a" * 1000) + + ("b" * 1000) + + ("c" * 1000) + + ("d" * 1000) + + ("e" * 1000) + + ("f" * 1000) + } + + val testFile = { + val f = File.createTempFile("file-source-spec", ".tmp") + new FileWriter(f).append(TestText).close() + f + } + + val notExistingFile = { + // this way we make sure it doesn't accidentally exist + val f = File.createTempFile("not-existing-file", ".tmp") + f.delete() + f + } + + val LinesCount = 2000 + new Random().nextInt(300) + + val manyLines = { + val f = File.createTempFile(s"file-source-spec-lines_$LinesCount", "tmp") + val w = new FileWriter(f) + (1 to LinesCount).foreach { l ⇒ + w.append("a" * l).append("\n") + } + w.close() + f + } + + "File Source" must { + "read contents from a file" in checkThatAllStagesAreStopped { + val chunkSize = 512 + val bufferAttributes = OperationAttributes.inputBuffer(1, 2) + + val p = SynchronousFileSource(testFile, chunkSize) + .withAttributes(bufferAttributes) + .runWith(Sink.publisher) + val c = StreamTestKit.SubscriberProbe[ByteString]() + p.subscribe(c) + val sub = c.expectSubscription() + + var remaining = TestText + def nextChunk() = { + val (chunk, rest) = remaining.splitAt(chunkSize) + remaining = rest + chunk + } + + sub.request(1) + c.expectNext().utf8String should ===(nextChunk().toString) + sub.request(1) + c.expectNext().utf8String should ===(nextChunk().toString) + c.expectNoMsg(300.millis) + + sub.request(200) + var expectedChunk = nextChunk().toString + while (expectedChunk != "") { + c.expectNext().utf8String should ===(expectedChunk) + expectedChunk = nextChunk().toString + } + sub.request(1) + + c.expectComplete() + } + + "complete only when all contents of a file have been signalled" in checkThatAllStagesAreStopped { + val chunkSize = 256 + val bufferAttributes = OperationAttributes.inputBuffer(4, 8) + + val demandAllButOneChunks = TestText.length / chunkSize - 1 + + val p = SynchronousFileSource(testFile, chunkSize) + .withAttributes(bufferAttributes) + .runWith(Sink.publisher) + + val c = StreamTestKit.SubscriberProbe[ByteString]() + p.subscribe(c) + val sub = c.expectSubscription() + + var remaining = TestText + def nextChunk() = { + val (chunk, rest) = remaining.splitAt(chunkSize) + remaining = rest + chunk + } + + sub.request(demandAllButOneChunks) + for (i ← 1 to demandAllButOneChunks) c.expectNext().utf8String should ===(nextChunk()) + c.expectNoMsg(300.millis) + + sub.request(1) + c.expectNext().utf8String should ===(nextChunk()) + c.expectNoMsg(200.millis) + + sub.request(1) + c.expectNext().utf8String should ===(nextChunk()) + c.expectComplete() + } + + "onError whent trying to read from file which does not exist" in checkThatAllStagesAreStopped { + val p = SynchronousFileSource(notExistingFile).runWith(Sink.publisher) + val c = StreamTestKit.SubscriberProbe[ByteString]() + p.subscribe(c) + + c.expectSubscription() + c.expectError() + } + + List( + Settings(chunkSize = 512, readAhead = 2), + Settings(chunkSize = 512, readAhead = 4), + Settings(chunkSize = 2048, readAhead = 2), + Settings(chunkSize = 2048, readAhead = 4)) foreach { settings ⇒ + import settings._ + + s"count lines in real file (chunkSize = $chunkSize, readAhead = $readAhead)" in { + val s = SynchronousFileSource(manyLines, chunkSize = chunkSize) + .withAttributes(OperationAttributes.inputBuffer(readAhead, readAhead)) + + val f = s.runWith(Sink.fold(0) { case (acc, l) ⇒ acc + l.utf8String.count(_ == '\n') }) + + val lineCount = Await.result(f, 3.seconds) + lineCount should ===(LinesCount) + } + } + + "use dedicated file-io-dispatcher by default" in { + val sys = ActorSystem("dispatcher-testing", StreamTestKit.UnboundedMailboxConfig) + val mat = ActorFlowMaterializer()(sys) + implicit val timeout = Timeout(500.millis) + + try { + SynchronousFileSource(manyLines).runWith(Sink.ignore)(mat) + + val ref = Await.result(sys.actorSelection("/user/$a/flow-*").resolveOne(), timeout.duration) + ref.asInstanceOf[RepointableActorRef].underlying.asInstanceOf[ActorCell].dispatcher.id should ===("akka.stream.default-file-io-dispatcher") + } finally shutdown(sys) + } + + "allow overriding the dispatcher using OperationAttributes" in { + val sys = ActorSystem("dispatcher-testing", StreamTestKit.UnboundedMailboxConfig) + val mat = ActorFlowMaterializer()(sys) + implicit val timeout = Timeout(500.millis) + + try { + SynchronousFileSource(manyLines) + .withAttributes(ActorOperationAttributes.dispatcher("akka.actor.default-dispatcher")) + .runWith(Sink.ignore)(mat) + + val ref = Await.result(sys.actorSelection("/user/$a/flow-*").resolveOne(), timeout.duration) + ref.asInstanceOf[RepointableActorRef].underlying.asInstanceOf[ActorCell].dispatcher.id should ===("akka.actor.default-dispatcher") + } finally shutdown(sys) + } + } + + override def afterTermination(): Unit = { + testFile.delete() + manyLines.delete() + } + +} + diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 695fefeaf0..02b60dbc16 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -42,5 +42,21 @@ akka { output-burst-limit = 1000 } + # Fully qualified config path which holds the dispatcher configuration + # to be used by FlowMaterialiser when creating Actors for IO operations, + # such as FileSource, FileSink and others. + file-io-dispatcher = "akka.stream.default-file-io-dispatcher" + + default-file-io-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + throughput = 1 + + thread-pool-executor { + core-pool-size-min = 2 + core-pool-size-factor = 2.0 + core-pool-size-max = 16 + } + } } } diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index bd39bef27c..98e9806767 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -5,15 +5,13 @@ package akka.stream import java.util.Locale import java.util.concurrent.TimeUnit + import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props } import akka.stream.impl._ -import akka.stream.scaladsl.RunnableFlow -import com.typesafe.config.Config -import scala.concurrent.duration._ -import akka.actor.Props -import akka.actor.ActorRef import akka.stream.javadsl.japi -import scala.concurrent.ExecutionContextExecutor +import com.typesafe.config.Config + +import scala.concurrent.duration._ object ActorFlowMaterializer { @@ -52,6 +50,7 @@ object ActorFlowMaterializer { val system = actorSystemOf(context) new ActorFlowMaterializerImpl( + system, materializerSettings, system.dispatchers, context.actorOf(StreamSupervisor.props(materializerSettings).withDispatcher(materializerSettings.dispatcher)), @@ -146,6 +145,9 @@ abstract class ActorFlowMaterializer extends FlowMaterializer { def effectiveSettings(opAttr: OperationAttributes): ActorFlowMaterializerSettings + /** INTERNAL API */ + def system: ActorSystem + /** * INTERNAL API: this might become public later */ diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index 56443c002b..73c95e6b3f 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -198,14 +198,18 @@ trait ActorPublisher[T] extends Actor { /** * Complete the stream. After that you are not allowed to * call [[#onNext]], [[#onError]] and [[#onComplete]]. + * + * After signalling completion the Actor will then stop itself as it has completed the protocol. + * When [[#onComplete]] is called before any [[Subscriber]] has had the chance to subscribe + * to this [[ActorPublisher]] the completion signal (and therefore stopping of the Actor as well) + * will be delayed until such [[Subscriber]] arrives. */ def onComplete(): Unit = lifecycleState match { case Active | PreSubscriber ⇒ lifecycleState = Completed - if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives - try tryOnComplete(subscriber) finally { - subscriber = null // not used after onComplete - } + if (subscriber ne null) { + try tryOnComplete(subscriber) finally context.stop(self) + } // otherwise onComplete will be called when the subscription arrives case Completed ⇒ throw new IllegalStateException("onComplete must only be called once") case _: ErrorEmitted ⇒ @@ -216,13 +220,18 @@ trait ActorPublisher[T] extends Actor { /** * Terminate the stream with failure. After that you are not allowed to * call [[#onNext]], [[#onError]] and [[#onComplete]]. + * + * After signalling the Error the Actor will then stop itself as it has completed the protocol. + * When [[#onError]] is called before any [[Subscriber]] has had the chance to subscribe + * to this [[ActorPublisher]] the error signal (and therefore stopping of the Actor as well) + * will be delayed until such [[Subscriber]] arrives. */ def onError(cause: Throwable): Unit = lifecycleState match { case Active | PreSubscriber ⇒ lifecycleState = ErrorEmitted(cause) - if (subscriber ne null) // otherwise onError will be called when the subscription arrives - try tryOnError(subscriber, cause) finally - subscriber = null // not used after onError + if (subscriber ne null) { + try tryOnError(subscriber, cause) finally context.stop(self) + } // otherwise onError will be called when the subscription arrives case _: ErrorEmitted ⇒ throw new IllegalStateException("onError must only be called once") case Completed ⇒ @@ -255,9 +264,11 @@ trait ActorPublisher[T] extends Actor { lifecycleState = Active tryOnSubscribe(sub, new ActorPublisherSubscription(self)) case ErrorEmitted(cause) ⇒ + context.stop(self) tryOnSubscribe(sub, CancelledSubscription) tryOnError(sub, cause) case Completed ⇒ + context.stop(self) tryOnSubscribe(sub, CancelledSubscription) tryOnComplete(sub) case Active | Canceled ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala index b796a9c540..f57c67c91a 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala @@ -192,9 +192,10 @@ trait ActorSubscriber extends Actor { case OnSubscribe(sub) ⇒ if (subscription.isEmpty) { subscription = Some(sub) - if (_canceled) + if (_canceled) { + context.stop(self) sub.cancel() - else if (requested != 0) + } else if (requested != 0) sub.request(remainingRequested) } else sub.cancel() @@ -260,13 +261,22 @@ trait ActorSubscriber extends Actor { } /** - * Cancel upstream subscription. No more elements will - * be delivered after cancel. + * Cancel upstream subscription. + * No more elements will be delivered after cancel. + * + * The [[ActorSubscriber]] will be stopped immediatly after signalling cancelation. + * In case the upstream subscription has not yet arrived the Actor will stay alive + * until a subscription arrives, cancel it and then stop itself. */ protected def cancel(): Unit = if (!_canceled) { - subscription.foreach(_.cancel()) - _canceled = true + subscription match { + case Some(s) ⇒ + context.stop(self) + s.cancel() + case _ ⇒ + _canceled = true // cancel will be signalled once a subscription arrives + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index 8af7083ca4..d43545c925 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -27,7 +27,8 @@ import scala.concurrent.{ Await, ExecutionContextExecutor } /** * INTERNAL API */ -private[akka] case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterializerSettings, +private[akka] case class ActorFlowMaterializerImpl(override val system: ActorSystem, + override val settings: ActorFlowMaterializerSettings, dispatchers: Dispatchers, supervisor: ActorRef, flowNameCounter: AtomicLong, @@ -244,6 +245,13 @@ private[akka] object StreamSupervisor { def props(settings: ActorFlowMaterializerSettings): Props = Props(new StreamSupervisor(settings)) final case class Materialize(props: Props, name: String) extends DeadLetterSuppression + + /** Testing purpose */ + final case object GetChildren + /** Testing purpose */ + final case class Children(children: Set[ActorRef]) + /** Testing purpose */ + final case object StopChildren } private[akka] class StreamSupervisor(settings: ActorFlowMaterializerSettings) extends Actor { @@ -255,6 +263,8 @@ private[akka] class StreamSupervisor(settings: ActorFlowMaterializerSettings) ex case Materialize(props, name) ⇒ val impl = context.actorOf(props, name) sender() ! impl + case GetChildren ⇒ sender() ! Children(context.children.toSet) + case StopChildren ⇒ context.children.foreach(context.stop) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala similarity index 98% rename from akka-stream/src/main/scala/akka/stream/impl/Sources.scala rename to akka-stream/src/main/scala/akka/stream/impl/Modules.scala index 869144bb73..b7abbfb3eb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -3,18 +3,18 @@ */ package akka.stream.impl +import java.io.{ InputStream, File } import java.util.concurrent.atomic.AtomicBoolean import akka.actor.{ ActorRef, Cancellable, PoisonPill, Props } +import akka.stream.ActorOperationAttributes.Dispatcher import akka.stream.impl.StreamLayout.Module -import akka.stream.OperationAttributes -import akka.stream.{ Outlet, OverflowStrategy, Shape, SourceShape } +import akka.stream._ +import akka.util.ByteString import org.reactivestreams._ import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } import scala.util.{ Failure, Success } -import akka.stream.MaterializationContext -import akka.stream.ActorFlowMaterializer /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 5e425e326b..c309e7a5ca 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -3,13 +3,17 @@ */ package akka.stream.impl +import java.io.File import java.util.concurrent.atomic.AtomicReference import akka.actor.{ ActorRef, Props } +import akka.stream.ActorOperationAttributes.Dispatcher import akka.stream.impl.StreamLayout.Module import akka.stream.OperationAttributes import akka.stream.{ Inlet, Shape, SinkShape } +import akka.util.ByteString import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.annotation.unchecked.uncheckedVariance +import scala.collection.immutable import scala.concurrent.{ Future, Promise } import akka.stream.MaterializationContext import akka.stream.ActorFlowMaterializer diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 73dd93339d..3edb24b4e2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -54,13 +54,13 @@ private[stream] object Stages { val flexiMerge = name("flexiMerge") val flexiRoute = name("flexiRoute") val identityJunction = name("identityJunction") + val repeat = name("repeat") val publisherSource = name("publisherSource") val iterableSource = name("iterableSource") val futureSource = name("futureSource") val tickSource = name("tickSource") val singleSource = name("singleSource") - val repeat = name("repeat") val emptySource = name("emptySource") val lazyEmptySource = name("lazyEmptySource") val failedSource = name("failedSource") @@ -69,6 +69,8 @@ private[stream] object Stages { val subscriberSource = name("subscriberSource") val actorPublisherSource = name("actorPublisherSource") val actorRefSource = name("actorRefSource") + val synchronousFileSource = name("synchronousFileSource") + val inputStreamSource = name("inputStreamSource") val subscriberSink = name("subscriberSink") val cancelledSink = name("cancelledSink") @@ -78,6 +80,8 @@ private[stream] object Stages { val ignoreSink = name("ignoreSink") val actorRefSink = name("actorRefSink") val actorSubscriberSink = name("actorSubscriberSink") + val synchronousFileSink = name("synchronousFileSink") + val outputStreamSink = name("outputStreamSink") } import DefaultAttributes._ diff --git a/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala b/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala new file mode 100644 index 0000000000..415d2398b3 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io + +import java.io.InputStream + +import akka.stream.io.impl.InputStreamSource +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.Source._ +import akka.stream.{ OperationAttributes, javadsl } +import akka.util.ByteString + +import scala.concurrent.Future + +object InputStreamSource { + + final val DefaultChunkSize = 8192 + final val DefaultAttributes = OperationAttributes.name("inputStreamSource") + + /** + * Creates a Source that will pull data out of the given input stream. + * Emitted elements are `chunkSize` sized [[ByteString]] elements. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def apply(createInputStream: () ⇒ InputStream, chunkSize: Int = DefaultChunkSize): Source[ByteString, Future[Long]] = + new Source(new InputStreamSource(createInputStream, chunkSize, DefaultAttributes, shape("InputStreamSource"))) + + /** + * Java API + * + * Creates a Source that will pull data out of the given input stream. + * Emitted elements are [[ByteString]] elements, chunked by default by [[DefaultChunkSize]] bytes. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def create(createInputStream: javadsl.japi.Creator[InputStream]): javadsl.Source[ByteString, Future[Long]] = + create(createInputStream, DefaultChunkSize) + + /** + * Java API + * + * Creates a Source that will pull data out of the given input stream. + * Emitted elements are `chunkSize` sized [[ByteString]] elements. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def create(createInputStream: javadsl.japi.Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[Long]] = + apply(() ⇒ createInputStream.create(), chunkSize).asJava + +} diff --git a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala new file mode 100644 index 0000000000..896d69e9f9 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2014-2015 Typesafe Inc. + */ +package akka.stream.io + +import java.io.OutputStream + +import akka.stream.io.impl.OutputStreamSink +import akka.stream.scaladsl.Sink +import akka.stream.{ ActorOperationAttributes, OperationAttributes, javadsl } +import akka.util.ByteString + +import scala.concurrent.Future + +/** + * Sink which writes incoming [[ByteString]]s to the given [[OutputStream]]. + */ +object OutputStreamSink { + + final val DefaultAttributes = OperationAttributes.name("outputStreamSink") + + /** + * Sink which writes incoming [[ByteString]]s to the given [[OutputStream]]. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`, + * unless configured otherwise by using [[ActorOperationAttributes]]. + */ + def apply(output: () ⇒ OutputStream): Sink[ByteString, Future[Long]] = + new Sink(new OutputStreamSink(output, DefaultAttributes, Sink.shape("OutputStreamSink"))) + + /** + * Java API + * + * Sink which writes incoming [[ByteString]]s to the given [[OutputStream]]. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + */ + def create(f: javadsl.japi.Creator[OutputStream]): javadsl.Sink[ByteString, Future[Long]] = + apply(() ⇒ f.create()).asJava + +} diff --git a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala new file mode 100644 index 0000000000..de7175b9dd --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2014-2015 Typesafe Inc. + */ +package akka.stream.io + +import java.io.File + +import akka.stream.{ OperationAttributes, javadsl, ActorOperationAttributes } +import akka.stream.io.impl.SynchronousFileSink +import akka.stream.scaladsl.Sink +import akka.util.ByteString + +import scala.concurrent.Future + +/** + * Sink which writes incoming [[ByteString]]s to the given file + */ +object SynchronousFileSink { + + final val DefaultAttributes = OperationAttributes.name("synchronousFileSink") + + /** + * Synchronous (Java 6 compatible) Sink that writes incoming [[ByteString]] elements to the given file. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`, + * unless configured otherwise by using [[ActorOperationAttributes]]. + */ + def apply(f: File, append: Boolean = false): Sink[ByteString, Future[Long]] = + new Sink(new SynchronousFileSink(f, append, DefaultAttributes, Sink.shape("SynchronousFileSink"))) + + /** + * Java API + * + * Synchronous (Java 6 compatible) Sink that writes incoming [[ByteString]] elements to the given file. + * Overwrites existing files, if you want to append to an existing file use [[#create(File, Boolean)]] instead. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`, + * unless configured otherwise by using [[ActorOperationAttributes]]. + */ + def create(f: File): javadsl.Sink[ByteString, Future[Long]] = + apply(f, append = false).asJava + + /** + * Java API + * + * Synchronous (Java 6 compatible) Sink that writes incoming [[ByteString]] elements to the given file. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * This source is backed by an Actor which will use the dedicated `akka.stream.file-io-dispatcher`, + * unless configured otherwise by using [[ActorOperationAttributes]]. + */ + def appendTo(f: File): javadsl.Sink[ByteString, Future[Long]] = + apply(f, append = true).asJava + +} diff --git a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala new file mode 100644 index 0000000000..fc6dd64ab9 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io + +import java.io.File + +import akka.stream.io.impl.SynchronousFileSource +import akka.stream.scaladsl.Source +import akka.stream.{ ActorOperationAttributes, OperationAttributes, javadsl } +import akka.util.ByteString + +import scala.concurrent.Future + +object SynchronousFileSource { + + final val DefaultChunkSize = 8192 + final val DefaultAttributes = OperationAttributes.name("synchronousFileSource") + + /** + * Creates a synchronous (Java 6 compatible) Source from a Files contents. + * Emitted elements are `chunkSize` sized [[ByteString]] elements. + * + * This source is backed by an Actor which will use the dedicated thread-pool base dispatcher. + * You can configure the default dispatcher for this Source by changing the `akka.stream.file-io-dispatcher` or + * set it for a given Source by using [[ActorOperationAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def apply(f: File, chunkSize: Int = DefaultChunkSize): Source[ByteString, Future[Long]] = + new Source(new SynchronousFileSource(f, chunkSize, DefaultAttributes, Source.shape("SynchronousFileSource"))) + + /** + * Creates a synchronous (Java 6 compatible) Source from a Files contents. + * Emitted elements are [[ByteString]] elements, chubnked by default by [[DefaultChunkSize]] bytes. + * + * This source is backed by an Actor which will use the dedicated thread-pool base dispatcher. + * You can configure the default dispatcher for this Source by changing the `akka.stream.file-io-dispatcher` or + * set it for a given Source by using [[ActorOperationAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def create(f: File): javadsl.Source[ByteString, Future[Long]] = + create(f, DefaultChunkSize) + + /** + * Creates a synchronous (Java 6 compatible) Source from a Files contents. + * Emitted elements are `chunkSize` sized [[ByteString]] elements. + * + * This source is backed by an Actor which will use the dedicated thread-pool base dispatcher. + * You can configure the default dispatcher for this Source by changing the `akka.stream.file-io-dispatcher` or + * set it for a given Source by using [[ActorOperationAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def create(f: File, chunkSize: Int): javadsl.Source[ByteString, Future[Long]] = + apply(f, chunkSize).asJava + +} diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/IOSettings.scala b/akka-stream/src/main/scala/akka/stream/io/impl/IOSettings.scala new file mode 100644 index 0000000000..0681e8ebf7 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/impl/IOSettings.scala @@ -0,0 +1,14 @@ +package akka.stream.io.impl + +import akka.stream.ActorOperationAttributes.Dispatcher +import akka.stream.{ ActorFlowMaterializer, MaterializationContext } + +private[stream] object IOSettings { + /** Picks default akka.stream.file-io-dispatcher or the OperationAttributes configured one */ + def fileIoDispatcher(context: MaterializationContext): String = { + val mat = ActorFlowMaterializer.downcast(context.materializer) + context.effectiveAttributes.attributes.collectFirst { case d: Dispatcher ⇒ d.dispatcher } getOrElse { + mat.system.settings.config.getString("akka.stream.file-io-dispatcher") + } + } +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/io/impl/IOSinks.scala new file mode 100644 index 0000000000..aac9015381 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/impl/IOSinks.scala @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io.impl + +import java.io.{ File, OutputStream } + +import akka.stream.impl.SinkModule +import akka.stream.impl.StreamLayout.Module +import akka.stream.io.impl.IOSettings._ +import akka.stream.{ ActorFlowMaterializer, MaterializationContext, OperationAttributes, SinkShape } +import akka.util.ByteString + +import scala.concurrent.{ Future, Promise } + +/** + * INTERNAL API + * Creates simple synchronous (Java 6 compatible) Sink which writes all incoming elements to the given file + * (creating it before hand if neccessary). + */ +private[akka] final class SynchronousFileSink(f: File, append: Boolean, val attributes: OperationAttributes, shape: SinkShape[ByteString]) + extends SinkModule[ByteString, Future[Long]](shape) { + + override def create(context: MaterializationContext) = { + val mat = ActorFlowMaterializer.downcast(context.materializer) + val settings = mat.effectiveSettings(context.effectiveAttributes) + + val bytesWrittenPromise = Promise[Long]() + val props = SynchronousFileSubscriber.props(f, bytesWrittenPromise, settings.maxInputBufferSize, append) + val dispatcher = fileIoDispatcher(context) + + val ref = mat.actorOf(context, props.withDispatcher(dispatcher)) + (akka.stream.actor.ActorSubscriber[ByteString](ref), bytesWrittenPromise.future) + } + + override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[Long]] = + new SynchronousFileSink(f, append, attributes, shape) + + override def withAttributes(attr: OperationAttributes): Module = + new SynchronousFileSink(f, append, attr, amendShape(attr)) +} + +/** + * INTERNAL API + * Creates simple synchronous (Java 6 compatible) Sink which writes all incoming elements to the given file + * (creating it before hand if neccessary). + */ +private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, val attributes: OperationAttributes, shape: SinkShape[ByteString]) + extends SinkModule[ByteString, Future[Long]](shape) { + + override def create(context: MaterializationContext) = { + val mat = ActorFlowMaterializer.downcast(context.materializer) + val settings = mat.effectiveSettings(context.effectiveAttributes) + val bytesWrittenPromise = Promise[Long]() + + val os = createOutput() // if it fails, we fail the materialization + + val props = OutputStreamSubscriber.props(os, bytesWrittenPromise, settings.maxInputBufferSize) + + val ref = mat.actorOf(context, props) + (akka.stream.actor.ActorSubscriber[ByteString](ref), bytesWrittenPromise.future) + } + + override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[Long]] = + new OutputStreamSink(createOutput, attributes, shape) + + override def withAttributes(attr: OperationAttributes): Module = + new OutputStreamSink(createOutput, attr, amendShape(attr)) +} diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/IOSources.scala b/akka-stream/src/main/scala/akka/stream/io/impl/IOSources.scala new file mode 100644 index 0000000000..4678c0de61 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/impl/IOSources.scala @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io.impl + +import java.io.{ File, InputStream } + +import akka.stream._ +import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.{ ErrorPublisher, SourceModule } +import akka.util.ByteString +import org.reactivestreams._ + +import scala.concurrent.{ Future, Promise } + +/** + * INTERNAL API + * Creates simple synchronous (Java 6 compatible) Source backed by the given file. + */ +private[akka] final class SynchronousFileSource(f: File, chunkSize: Int, val attributes: OperationAttributes, shape: SourceShape[ByteString]) + extends SourceModule[ByteString, Future[Long]](shape) { + override def create(context: MaterializationContext) = { + val mat = ActorFlowMaterializer.downcast(context.materializer) + val settings = mat.effectiveSettings(context.effectiveAttributes) + + val bytesReadPromise = Promise[Long]() + val props = SynchronousFilePublisher.props(f, bytesReadPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize) + val dispatcher = IOSettings.fileIoDispatcher(context) + + val ref = mat.actorOf(context, props.withDispatcher(dispatcher)) + + (akka.stream.actor.ActorPublisher[ByteString](ref), bytesReadPromise.future) + } + + override protected def newInstance(shape: SourceShape[ByteString]): SourceModule[ByteString, Future[Long]] = + new SynchronousFileSource(f, chunkSize, attributes, shape) + + override def withAttributes(attr: OperationAttributes): Module = + new SynchronousFileSource(f, chunkSize, attr, amendShape(attr)) +} + +/** + * INTERNAL API + * Source backed by the given input stream. + */ +private[akka] final class InputStreamSource(createInputStream: () ⇒ InputStream, chunkSize: Int, val attributes: OperationAttributes, shape: SourceShape[ByteString]) + extends SourceModule[ByteString, Future[Long]](shape) { + override def create(context: MaterializationContext) = { + val mat = ActorFlowMaterializer.downcast(context.materializer) + val settings = mat.effectiveSettings(context.effectiveAttributes) + val bytesReadPromise = Promise[Long]() + + val pub = try { + val is = createInputStream() // can throw, i.e. FileNotFound + + val props = InputStreamPublisher.props(is, bytesReadPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize) + + val ref = mat.actorOf(context, props) + akka.stream.actor.ActorPublisher[ByteString](ref) + } catch { + case ex: Exception ⇒ + bytesReadPromise.failure(ex) + ErrorPublisher(ex, attributes.nameOrDefault("inputStreamSource")).asInstanceOf[Publisher[ByteString]] + } + + (pub, bytesReadPromise.future) + } + + override protected def newInstance(shape: SourceShape[ByteString]): SourceModule[ByteString, Future[Long]] = + new InputStreamSource(createInputStream, chunkSize, attributes, shape) + + override def withAttributes(attr: OperationAttributes): Module = + new InputStreamSource(createInputStream, chunkSize, attr, amendShape(attr)) +} diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/io/impl/InputStreamPublisher.scala new file mode 100644 index 0000000000..b866b3d1ad --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/impl/InputStreamPublisher.scala @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io.impl + +import java.io.InputStream + +import akka.actor.{ ActorLogging, DeadLetterSuppression, Props } +import akka.io.DirectByteBufferPool +import akka.stream.actor.ActorPublisherMessage +import akka.util.ByteString +import akka.util.ByteString.ByteString1C + +import scala.annotation.tailrec +import scala.concurrent.Promise + +/** INTERNAL API */ +private[akka] object InputStreamPublisher { + + def props(is: InputStream, completionPromise: Promise[Long], chunkSize: Int, initialBuffer: Int, maxBuffer: Int): Props = { + require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)") + require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)") + require(maxBuffer >= initialBuffer, s"maxBuffer must be >= initialBuffer (was $maxBuffer)") + + Props(classOf[InputStreamPublisher], is, completionPromise, chunkSize, initialBuffer, maxBuffer) + } + + private final case object Continue extends DeadLetterSuppression +} + +/** INTERNAL API */ +private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Promise[Long], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) + extends akka.stream.actor.ActorPublisher[ByteString] + with ActorLogging { + + // TODO possibly de-duplicate with SynchronousFilePublisher? + + import InputStreamPublisher._ + + val buffs = new DirectByteBufferPool(chunkSize, maxBuffer) + var eofReachedAtOffset = Long.MinValue + + var readBytesTotal = 0L + var availableChunks: Vector[ByteString] = Vector.empty + + override def preStart() = { + try { + readAndSignal(initialBuffer) + } catch { + case ex: Exception ⇒ + onError(ex) + } + + super.preStart() + } + + def receive = { + case ActorPublisherMessage.Request(elements) ⇒ readAndSignal(maxBuffer) + case Continue ⇒ readAndSignal(maxBuffer) + case ActorPublisherMessage.Cancel ⇒ context.stop(self) + } + + def readAndSignal(readAhead: Int): Unit = + if (isActive) { + // signal from available buffer right away + signalOnNexts() + + // read chunks until readAhead is fulfilled + while (availableChunks.length < readAhead && !eofEncountered && isActive) + loadChunk() + + if (totalDemand > 0) self ! Continue + else if (availableChunks.isEmpty) signalOnNexts() + } + + @tailrec private def signalOnNexts(): Unit = + if (availableChunks.nonEmpty) { + if (totalDemand > 0) { + val ready = availableChunks.head + availableChunks = availableChunks.tail + + onNext(ready) + + if (totalDemand > 0) signalOnNexts() + } + } else if (eofEncountered) onComplete() + + /** BLOCKING I/O READ */ + def loadChunk() = try { + val arr = Array.ofDim[Byte](chunkSize) + + // blocking read + val readBytes = is.read(arr) + + readBytes match { + case -1 ⇒ + // had nothing to read into this chunk + eofReachedAtOffset = readBytes + log.debug("No more bytes available to read (got `-1` or `0` from `read`), marking final bytes of file @ " + eofReachedAtOffset) + + case _ ⇒ + readBytesTotal += readBytes + if (readBytes == chunkSize) availableChunks :+= ByteString1C(arr) + else availableChunks :+= ByteString1C(arr).take(readBytes) + + // valid read, continue + } + } catch { + case ex: Exception ⇒ + onError(ex) + } + + private final def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue + + override def postStop(): Unit = { + super.postStop() + bytesReadPromise.trySuccess(readBytesTotal) + + if (is ne null) is.close() + } +} diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/OutputStreamSubscriber.scala b/akka-stream/src/main/scala/akka/stream/io/impl/OutputStreamSubscriber.scala new file mode 100644 index 0000000000..75a43faac9 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/impl/OutputStreamSubscriber.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io.impl + +import java.io.OutputStream + +import akka.actor.{ ActorLogging, Props } +import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } +import akka.util.ByteString + +import scala.concurrent.Promise + +/** INTERNAL API */ +private[akka] object OutputStreamSubscriber { + def props(os: OutputStream, completionPromise: Promise[Long], bufSize: Int) = { + require(bufSize > 0, "buffer size must be > 0") + Props(classOf[OutputStreamSubscriber], os, completionPromise, bufSize) + } + +} + +/** INTERNAL API */ +private[akka] class OutputStreamSubscriber(os: OutputStream, bytesWrittenPromise: Promise[Long], bufSize: Int) + extends akka.stream.actor.ActorSubscriber + with ActorLogging { + + override protected val requestStrategy = WatermarkRequestStrategy(highWatermark = bufSize) + + private var bytesWritten: Long = 0 + + def receive = { + case ActorSubscriberMessage.OnNext(bytes: ByteString) ⇒ + try { + // blocking write + os.write(bytes.toArray) + bytesWritten += bytes.length + } catch { + case ex: Exception ⇒ + println("ex = " + ex) + bytesWrittenPromise.failure(ex) + cancel() + } + + case ActorSubscriberMessage.OnError(cause) ⇒ + log.error(cause, "Tearing down OutputStreamSink due to upstream error, wrote bytes: {}", bytesWritten) + context.stop(self) + + case ActorSubscriberMessage.OnComplete ⇒ + context.stop(self) + os.flush() + } + + override def postStop(): Unit = { + bytesWrittenPromise.trySuccess(bytesWritten) + + if (os ne null) os.close() + super.postStop() + } +} diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFilePublisher.scala b/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFilePublisher.scala new file mode 100644 index 0000000000..1b5ff9f363 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFilePublisher.scala @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io.impl + +import java.io.{ File, RandomAccessFile } +import java.nio.ByteBuffer +import java.nio.channels.FileChannel + +import akka.actor.{ ActorLogging, DeadLetterSuppression, Props } +import akka.stream.actor.ActorPublisherMessage +import akka.util.ByteString + +import scala.annotation.tailrec +import scala.concurrent.Promise + +/** INTERNAL API */ +private[akka] object SynchronousFilePublisher { + def props(f: File, completionPromise: Promise[Long], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) = { + require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)") + require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)") + require(maxBuffer >= initialBuffer, s"maxBuffer must be >= initialBuffer (was $maxBuffer)") + + Props(classOf[SynchronousFilePublisher], f, completionPromise, chunkSize, initialBuffer, maxBuffer) + } + + private final case object Continue extends DeadLetterSuppression + +} + +/** INTERNAL API */ +private[akka] class SynchronousFilePublisher(f: File, bytesReadPromise: Promise[Long], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) + extends akka.stream.actor.ActorPublisher[ByteString] + with ActorLogging { + + import SynchronousFilePublisher._ + + var eofReachedAtOffset = Long.MinValue + + var readBytesTotal = 0L + var availableChunks: Vector[ByteString] = Vector.empty // TODO possibly resign read-ahead-ing and make fusable as Stage + + private var raf: RandomAccessFile = _ + private var chan: FileChannel = _ + + override def preStart() = { + try { + raf = new RandomAccessFile(f, "r") // best way to express this in JDK6, OpenOption are available since JDK7 + chan = raf.getChannel + readAndSignal(initialBuffer) + } catch { + case ex: Exception ⇒ + onError(ex) + } + + super.preStart() + } + + def receive = { + case ActorPublisherMessage.Request(elements) ⇒ readAndSignal(maxBuffer) + case Continue ⇒ readAndSignal(maxBuffer) + case ActorPublisherMessage.Cancel ⇒ context.stop(self) + } + + def readAndSignal(readAhead: Int): Unit = + if (isActive) { + // signal from available buffer right away + signalOnNexts() + + // read chunks until readAhead is fulfilled + while (availableChunks.length < readAhead && !eofEncountered && isActive) + loadChunk() + + if (totalDemand > 0) self ! Continue + else if (availableChunks.isEmpty) signalOnNexts() + } + + @tailrec private def signalOnNexts(): Unit = + if (availableChunks.nonEmpty) { + if (totalDemand > 0) { + val ready = availableChunks.head + availableChunks = availableChunks.tail + + onNext(ready) + + if (totalDemand > 0) signalOnNexts() + } + } else if (eofEncountered) onComplete() + + /** BLOCKING I/O READ */ + def loadChunk() = try { + val buf = ByteBuffer.allocate(chunkSize) + + // blocking read + val readBytes = chan.read(buf) + + readBytes match { + case -1 ⇒ + // had nothing to read into this chunk + eofReachedAtOffset = chan.position + log.debug("No more bytes available to read (got `-1` or `0` from `read`), marking final bytes of file @ " + eofReachedAtOffset) + + case _ ⇒ + readBytesTotal += readBytes + availableChunks :+= ByteString(buf.array).take(readBytes) + } + } catch { + case ex: Exception ⇒ + onError(ex) + } + + private final def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue + + override def postStop(): Unit = { + super.postStop() + bytesReadPromise.trySuccess(readBytesTotal) + + if (chan ne null) chan.close() + if (raf ne null) raf.close() + } +} diff --git a/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFileSubscriber.scala new file mode 100644 index 0000000000..e57356f036 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/io/impl/SynchronousFileSubscriber.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.io.impl + +import java.io.{ File, RandomAccessFile } +import java.nio.channels.FileChannel + +import akka.actor.{ ActorLogging, Props } +import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } +import akka.util.ByteString + +import scala.concurrent.Promise + +/** INTERNAL API */ +private[akka] object SynchronousFileSubscriber { + def props(f: File, completionPromise: Promise[Long], bufSize: Int, append: Boolean) = { + require(bufSize > 0, "buffer size must be > 0") + Props(classOf[SynchronousFileSubscriber], f, completionPromise, bufSize, append) + } + +} + +/** INTERNAL API */ +private[akka] class SynchronousFileSubscriber(f: File, bytesWrittenPromise: Promise[Long], bufSize: Int, append: Boolean) + extends akka.stream.actor.ActorSubscriber + with ActorLogging { + + override protected val requestStrategy = WatermarkRequestStrategy(highWatermark = bufSize) + + private var raf: RandomAccessFile = _ + private var chan: FileChannel = _ + + private var bytesWritten: Long = 0 + + override def preStart(): Unit = try { + raf = new RandomAccessFile(f, "rw") // best way to express this in JDK6, OpenOption are available since JDK7 + chan = raf.getChannel + + // manually supporting appending to files - in Java 7 we could use OpenModes: FileChannel.open(f, openOptions.asJava) + if (append) chan.position(chan.size()) + + super.preStart() + } catch { + case ex: Exception ⇒ + bytesWrittenPromise.failure(ex) + cancel() + } + + def receive = { + case ActorSubscriberMessage.OnNext(bytes: ByteString) ⇒ + try { + bytesWritten += chan.write(bytes.asByteBuffer) + } catch { + case ex: Exception ⇒ + bytesWrittenPromise.failure(ex) + cancel() + } + + case ActorSubscriberMessage.OnError(cause) ⇒ + log.error(cause, "Tearing down SynchronousFileSink({}) due to upstream error", f.getAbsolutePath) + context.stop(self) + + case ActorSubscriberMessage.OnComplete ⇒ + try { + chan.force(true) + } catch { + case ex: Exception ⇒ + bytesWrittenPromise.failure(ex) + } + context.stop(self) + } + + override def postStop(): Unit = { + bytesWrittenPromise.trySuccess(bytesWritten) + + if (chan ne null) chan.close() + if (raf ne null) raf.close() + super.postStop() + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index dac8ec73c6..3bca710d64 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -1,17 +1,14 @@ /** - * Copyright (C) 2014 Typesafe Inc. + * Copyright (C) 2015 Typesafe Inc. */ package akka.stream.javadsl -import akka.actor.ActorRef -import akka.actor.Props -import akka.stream.javadsl -import akka.stream.scaladsl -import akka.stream._ -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber -import scala.concurrent.Future +import akka.actor.{ ActorRef, Props } import akka.stream.impl.StreamLayout +import akka.stream.{ javadsl, scaladsl, _ } +import org.reactivestreams.{ Publisher, Subscriber } + +import scala.concurrent.Future import scala.util.Try /** Java API */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index bdfef7f87b..955f81cc79 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -3,12 +3,15 @@ */ package akka.stream.javadsl +import java.io.File import scala.collection.immutable import java.util.concurrent.Callable import akka.actor.{ Cancellable, ActorRef, Props } import akka.japi.Util +import akka.stream.OperationAttributes._ import akka.stream._ -import akka.stream.impl.ActorPublisherSource +import akka.stream.impl.{ ActorPublisherSource, StreamLayout } +import akka.util.ByteString import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import scala.annotation.unchecked.uncheckedVariance diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index b9fe7c1b4c..47cb330e3f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -1,24 +1,22 @@ /** - * Copyright (C) 2014 Typesafe Inc. + * Copyright (C) 2014-2015 Typesafe Inc. */ package akka.stream.scaladsl import akka.stream.javadsl import akka.actor.{ ActorRef, Props } +import akka.stream._ +import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.stage.SyncDirective import akka.stream.{ SinkShape, Inlet, Outlet, Graph, OperationAttributes } import akka.stream.OperationAttributes._ import akka.stream.stage.{ TerminationDirective, Directive, Context, PushStage } import org.reactivestreams.{ Publisher, Subscriber } -import scala.annotation.unchecked.uncheckedVariance -import scala.concurrent.{ Promise, Future } -import scala.util.{ Success, Failure, Try } -import akka.stream.FlowMaterializer -import akka.stream.impl.StreamLayout.Module -import scala.util.control.NonFatal -import akka.stream.Supervision -import akka.stream.stage.SyncDirective + +import scala.concurrent.{ Future, Promise } +import scala.util.{ Failure, Success, Try } /** * A `Sink` is a set of stream processing steps that has one open input and an attached output. @@ -52,7 +50,8 @@ object Sink extends SinkApply { import OperationAttributes.none - private def shape[T](name: String): SinkShape[T] = SinkShape(new Inlet(name + ".in")) + /** INTERNAL API */ + private[stream] def shape[T](name: String): SinkShape[T] = SinkShape(new Inlet(name + ".in")) /** * A graph with the shape of a sink logically is a sink, this method makes diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 675b126dfd..89c4fba1b9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -1,11 +1,16 @@ /** - * Copyright (C) 2014 Typesafe Inc. + * Copyright (C) 2014-2015 Typesafe Inc. */ package akka.stream.scaladsl -import akka.stream.javadsl +import akka.actor.{ ActorRef, Cancellable, Props } +import akka.stream._ import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, SynchronousIterablePublisher, _ } +import akka.stream.stage.{ Context, PushPullStage, SyncDirective, TerminationDirective } +import org.reactivestreams.{ Publisher, Subscriber } + import akka.stream.{ SourceShape, Inlet, Outlet } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.stage.{ TerminationDirective, Directive, Context, PushPullStage } @@ -16,6 +21,8 @@ import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, SynchronousIterablePub import org.reactivestreams.Publisher import scala.collection.immutable import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ Future, Promise } +import scala.language.higherKinds import scala.concurrent.{ ExecutionContext, Future } import akka.stream.{ FlowMaterializer, Graph } import akka.stream.impl._ @@ -163,7 +170,8 @@ object Source extends SourceApply { private[stream] def apply[Out, Mat](module: SourceModule[Out, Mat]): Source[Out, Mat] = new Source(module) - private def shape[T](name: String): SourceShape[T] = SourceShape(new Outlet(name + ".out")) + /** INTERNAL API */ + private[stream] def shape[T](name: String): SourceShape[T] = SourceShape(new Outlet(name + ".out")) /** * Helper to create [[Source]] from `Publisher`.