From 125c996fae1b2b3853c60065fbb22526580a2107 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 16 Feb 2016 16:56:06 +0100 Subject: [PATCH] add new Streams Quick Start also move IOResult to akka.stream package --- .../akka/stream/io/FileSourcesBenchmark.scala | 5 +- .../code/docs/stream/QuickStartDocTest.java | 88 +++++++++++ .../TwitterStreamQuickstartDocTest.java | 4 + .../docs/stream/io/StreamFileDocTest.java | 1 - akka-docs/rst/java/persistence.rst | 4 +- .../rst/java/stream/stream-quickstart.rst | 138 +++++++++++++++++- .../code/docs/stream/QuickStartDocSpec.scala | 75 ++++++++++ .../docs/stream/io/StreamFileDocSpec.scala | 1 - akka-docs/rst/scala/persistence.rst | 4 +- .../rst/scala/stream/stream-quickstart.rst | 138 +++++++++++++++++- .../PersistentActorBoundedStashingSpec.scala | 4 +- .../akka/stream/io/OutputStreamSinkTest.java | 1 + .../scala/akka/stream/{io => }/IOResult.scala | 2 +- .../akka/stream/impl/io/FilePublisher.scala | 2 +- .../akka/stream/impl/io/FileSubscriber.scala | 2 +- .../scala/akka/stream/impl/io/IOSinks.scala | 2 +- .../scala/akka/stream/impl/io/IOSources.scala | 2 +- .../stream/impl/io/InputStreamPublisher.scala | 2 +- .../impl/io/OutputStreamSubscriber.scala | 2 +- .../scala/akka/stream/javadsl/FileIO.scala | 2 +- .../stream/javadsl/StreamConverters.scala | 2 +- .../scala/akka/stream/scaladsl/FileIO.scala | 2 +- .../stream/scaladsl/StreamConverters.scala | 2 +- 23 files changed, 459 insertions(+), 26 deletions(-) create mode 100644 akka-docs/rst/java/code/docs/stream/QuickStartDocTest.java create mode 100644 akka-docs/rst/scala/code/docs/stream/QuickStartDocSpec.scala rename akka-stream/src/main/scala/akka/stream/{io => }/IOResult.scala (97%) diff --git a/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala index 776da4c610..93c46dfd33 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala @@ -6,16 +6,15 @@ package akka.stream.io import java.io.{ FileInputStream, File } import java.util.concurrent.TimeUnit - -import akka.{Done, NotUsed} +import akka.{ Done, NotUsed } import akka.actor.ActorSystem import akka.stream.{ Attributes, ActorMaterializer } import akka.stream.scaladsl._ import akka.util.ByteString import org.openjdk.jmh.annotations._ - import scala.concurrent.duration._ import scala.concurrent.{ Promise, Await, Future } +import akka.stream.IOResult /** * Benchmark (bufSize) Mode Cnt Score Error Units diff --git a/akka-docs/rst/java/code/docs/stream/QuickStartDocTest.java b/akka-docs/rst/java/code/docs/stream/QuickStartDocTest.java new file mode 100644 index 0000000000..6a2b49e1d6 --- /dev/null +++ b/akka-docs/rst/java/code/docs/stream/QuickStartDocTest.java @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2016 Typesafe Inc. + */ +package docs.stream; + +import java.io.File; +import java.math.BigInteger; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.junit.*; + +import akka.Done; +import akka.NotUsed; +import akka.actor.ActorSystem; +//#imports +import akka.stream.*; +import akka.stream.javadsl.*; +//#imports +import akka.util.ByteString; +import scala.concurrent.duration.Duration; + +/** + * This class is not meant to be run as a test in the test suite, but it + * is set up such that it can be run interactively from within an IDE. + */ +public class QuickStartDocTest { + + static + //#create-materializer + final ActorSystem system = ActorSystem.create("QuickStart"); + final Materializer materializer = ActorMaterializer.create(system); + //#create-materializer + + @AfterClass + public static void teardown() { + system.terminate(); + } + + @Test + public void demonstrateSource() throws InterruptedException, ExecutionException { + //#create-source + final Source source = Source.range(1, 100); + //#create-source + + //#run-source + source.runForeach(i -> System.out.println(i), materializer); + //#run-source + + //#transform-source + final Source factorials = + source + .scan(BigInteger.ONE, (acc, next) -> acc.multiply(BigInteger.valueOf(next))); + + final CompletionStage result = + factorials + .map(num -> ByteString.fromString(num.toString() + "\n")) + .runWith(FileIO.toFile(new File("factorials.txt")), materializer); + //#transform-source + + //#use-transformed-sink + factorials.map(BigInteger::toString).runWith(lineSink("factorial2.txt"), materializer); + //#use-transformed-sink + + //#add-streams + final CompletionStage done = + factorials + .zipWith(Source.range(0, 99), (num, idx) -> String.format("%d! = %s", idx, num)) + .throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping()) + //#add-streams + .take(2) + //#add-streams + .runForeach(s -> System.out.println(s), materializer); + //#add-streams + + done.toCompletableFuture().get(); + } + + //#transform-sink + public Sink> lineSink(String filename) { + return Flow.of(String.class) + .map(s -> ByteString.fromString(s.toString() + "\n")) + .toMat(FileIO.toFile(new File(filename)), Keep.right()); + } + //#transform-sink + +} diff --git a/akka-docs/rst/java/code/docs/stream/TwitterStreamQuickstartDocTest.java b/akka-docs/rst/java/code/docs/stream/TwitterStreamQuickstartDocTest.java index d438048114..1f1d3843c3 100644 --- a/akka-docs/rst/java/code/docs/stream/TwitterStreamQuickstartDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/TwitterStreamQuickstartDocTest.java @@ -9,8 +9,10 @@ import akka.actor.ActorSystem; import akka.dispatch.Foreach; import akka.japi.JavaPartialFunction; import akka.testkit.JavaTestKit; +//#imports import akka.stream.*; import akka.stream.javadsl.*; +//#imports import docs.AbstractJavaTest; import docs.stream.TwitterStreamQuickstartDocTest.Model.Author; import docs.stream.TwitterStreamQuickstartDocTest.Model.Hashtag; @@ -39,6 +41,8 @@ import static docs.stream.TwitterStreamQuickstartDocTest.Model.tweets; @SuppressWarnings("unused") public class TwitterStreamQuickstartDocTest extends AbstractJavaTest { + private static final long serialVersionUID = 1L; + static ActorSystem system; static Materializer mat; diff --git a/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java b/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java index be558fbbf9..bb875ca4ae 100644 --- a/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java @@ -10,7 +10,6 @@ import java.util.concurrent.CompletionStage; import akka.Done; import akka.actor.ActorSystem; import akka.stream.ActorAttributes; -import akka.stream.io.IOResult; import akka.stream.javadsl.Sink; import akka.stream.javadsl.FileIO; import docs.AbstractJavaTest; diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 4f466b50ef..a70fc6aab9 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -118,7 +118,7 @@ about successful state changes by publishing events. When persisting events with ``persist`` it is guaranteed that the persistent actor will not receive further commands between the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` -calls in context of a single command. Incoming messages are :ref:`stashed ` until the ``persist`` +calls in context of a single command. Incoming messages are :ref:`stashed ` until the ``persist`` is completed. If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default), @@ -192,7 +192,7 @@ and before any other received messages. If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. -.. _internal-stash: +.. _internal-stash_java: Internal stash -------------- diff --git a/akka-docs/rst/java/stream/stream-quickstart.rst b/akka-docs/rst/java/stream/stream-quickstart.rst index cacdc00a96..04beb7d5f2 100644 --- a/akka-docs/rst/java/stream/stream-quickstart.rst +++ b/akka-docs/rst/java/stream/stream-quickstart.rst @@ -1,7 +1,141 @@ .. _stream-quickstart-java: -Quick Start Guide: Reactive Tweets -================================== +Quick Start Guide +================= + +A stream usually begins at a source, so this is also how we start an Akka +Stream. Before we create one, we import the full complement of streaming tools: + +.. includecode:: ../code/docs/stream/QuickStartDocTest.java#imports + +Now we will start with a rather simple source, emitting the integers 1 to 100: + +.. includecode:: ../code/docs/stream/QuickStartDocTest.java#create-source + +The :class:`Source` type is parameterized with two types: the first one is the +type of element that this source emits and the second one may signal that +running the source produces some auxiliary value (e.g. a network source may +provide information about the bound port or the peer’s address). Where no +auxiliary information is produced, the type ``akka.NotUsed`` is used—and a +simple range of integers surely falls into this category. + +Having created this source means that we have a description of how to emit the +first 100 natural numbers, but this source is not yet active. In order to get +those numbers out we have to run it: + +.. includecode:: ../code/docs/stream/QuickStartDocTest.java#run-source + +This line will complement the source with a consumer function—in this example +we simply print out the numbers to the console—and pass this little stream +setup to an Actor that runs it. This activation is signaled by having “run” be +part of the method name; there are other methods that run Akka Streams, and +they all follow this pattern. + +You may wonder where the Actor gets created that runs the stream, and you are +probably also asking yourself what this ``materializer`` means. In order to get +this value we first need to create an Actor system: + +.. includecode:: ../code/docs/stream/QuickStartDocTest.java#create-materializer + +There are other ways to create a materializer, e.g. from an +:class:`ActorContext` when using streams from within Actors. The +:class:`Materializer` is a factory for stream execution engines, it is the +thing that makes streams run—you don’t need to worry about any of the details +just now apart from that you need one for calling any of the ``run`` methods on +a :class:`Source`. + +The nice thing about Akka Streams is that the :class:`Source` is just a +description of what you want to run, and like an architect’s blueprint it can +be reused, incorporated into a larger design. We may choose to transform the +source of integers and write it to a file instead: + +.. includecode:: ../code/docs/stream/QuickStartDocTest.java#transform-source + +First we use the ``scan`` combinator to run a computation over the whole +stream: starting with the number 1 (``BigInteger.ONE``) we multiple by each of +the incoming numbers, one after the other; the scan operationemits the initial +value and then every calculation result. This yields the series of factorial +numbers which we stash away as a :class:`Source` for later reuse—it is +important to keep in mind that nothing is actually computed yet, this is just a +description of what we want to have computed once we run the stream. Then we +convert the resulting series of numbers into a stream of :class:`ByteString` +objects describing lines in a text file. This stream is then run by attaching a +file as the receiver of the data. In the terminology of Akka Streams this is +called a :class:`Sink`. :class:`IOResult` is a type that IO operations return +in Akka Streams in order to tell you how many bytes or elements were consumed +and whether the stream terminated normally or exceptionally. + +Reusable Pieces +--------------- + +One of the nice parts of Akka Streams—and something that other stream libraries +do not offer—is that not only sources can be reused like blueprints, all other +elements can be as well. We can take the file-writing :class:`Sink`, prepend +the processing steps necessary to get the :class:`ByteString` elements from +incoming strings and package that up as a reusable piece as well. Since the +language for writing these streams always flows from left to right (just like +plain English), we need a starting point that is like a source but with an +“open” input. In Akka Streams this is called a :class:`Flow`: + +.. includecode:: ../code/docs/stream/QuickStartDocTest.java#transform-sink + +Starting from a flow of strings we convert each to :class:`ByteString` and then +feed to the already known file-writing :class:`Sink`. The resulting blueprint +is a :class:`Sink>`, which means that it +accepts strings as its input and when materialized it will create auxiliary +information of type ``CompletionStage`` (when chaining operations on +a :class:`Source` or :class:`Flow` the type of the auxiliary information—called +the “materialized value”—is given by the leftmost starting point; since we want +to retain what the ``FileIO.toFile`` sink has to offer, we need to say +``Keep.right()``). + +We can use the new and shiny :class:`Sink` we just created by +attaching it to our ``factorials`` source—after a small adaptation to turn the +numbers into strings: + +.. includecode:: ../code/docs/stream/QuickStartDocTest.java#use-transformed-sink + +Time-Based Processing +--------------------- + +Before we start looking at a more involved example we explore the streaming +nature of what Akka Streams can do. Starting from the ``factorials`` source +we transform the stream by zipping it together with another stream, +represented by a :class:`Source` that emits the number 0 to 100: the first +number emitted by the ``factorials`` source is the factorial of zero, the +second is the factorial of one, and so on. We combine these two by forming +strings like ``"3! = 6"``. + +.. includecode:: ../code/docs/stream/QuickStartDocTest.java#add-streams + +All operations so far have been time-independent and could have been performed +in the same fashion on strict collections of elements. The next line +demonstrates that we are in fact dealing with streams that can flow at a +certain speed: we use the ``throttle`` combinator to slow down the stream to 1 +element per second (the second ``1`` in the argument list is the maximum size +of a burst that we want to allow—passing ``1`` means that the first element +gets through immediately and the second then has to wait for one second and so +on). + +If you run this program you will see one line printed per second. One aspect +that is not immediately visible deserves mention, though: if you try and set +the streams to produce a billion numbers each then you will notice that your +JVM does not crash with an OutOfMemoryError, even though you will also notice +that running the streams happens in the background, asynchronously (this is the +reason for the auxiliary information to be provided as a +:class:`CompletionStage`, in the future). The secret that makes this work is +that Akka Streams implicitly implement pervasive flow control, all combinators +respect back-pressure. This allows the throttle combinator to signal to all its +upstream sources of data that it can only accept elements at a certain +rate—when the incoming rate is higher than one per second the throttle +combinator will assert *back-pressure* upstream. + +This is basically all there is to Akka Streams in a nutshell—glossing over the +fact that there are dozens of sources and sinks and many more stream +transformation combinators to choose from, see also :ref:`stages-overview_java`. + +Reactive Tweets +=============== A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them. diff --git a/akka-docs/rst/scala/code/docs/stream/QuickStartDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/QuickStartDocSpec.scala new file mode 100644 index 0000000000..3e5bfe0cf6 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/stream/QuickStartDocSpec.scala @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2016 Typesafe Inc. + */ +package docs.stream + +//#imports +import akka.stream._ +import akka.stream.scaladsl._ +//#imports +import akka.{ NotUsed, Done } +import akka.actor.ActorSystem +import akka.util.ByteString + +import org.scalatest._ +import org.scalatest.concurrent._ +import scala.concurrent._ +import scala.concurrent.duration._ +import java.io.File + +class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFutures { + implicit val patience = PatienceConfig(5.seconds) + + //#create-materializer + implicit val system = ActorSystem("QuickStart") + implicit val materializer = ActorMaterializer() + //#create-materializer + + override def afterAll(): Unit = { + system.terminate() + } + + "demonstrate Source" in { + //#create-source + val source: Source[Int, NotUsed] = Source(1 to 100) + //#create-source + + //#run-source + source.runForeach(i => println(i))(materializer) + //#run-source + + //#transform-source + val factorials = source.scan(BigInt(1))((acc, next) => acc * next) + + val result: Future[IOResult] = + factorials + .map(num => ByteString(s"$num\n")) + .runWith(FileIO.toFile(new File("factorials.txt"))) + //#transform-source + + //#use-transformed-sink + factorials.map(_.toString).runWith(lineSink("factorial2.txt")) + //#use-transformed-sink + + //#add-streams + val done: Future[Done] = + factorials + .zipWith(Source(0 to 100))((num, idx) => s"$idx! = $num") + .throttle(1, 1.second, 1, ThrottleMode.shaping) + //#add-streams + .take(3) + //#add-streams + .runForeach(println) + //#add-streams + + done.futureValue + } + + //#transform-sink + def lineSink(filename: String): Sink[String, Future[IOResult]] = + Flow[String] + .map(s => ByteString(s + "\n")) + .toMat(FileIO.toFile(new File(filename)))(Keep.right) + //#transform-sink + +} diff --git a/akka-docs/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala index a672c6059e..8d6280013f 100644 --- a/akka-docs/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala @@ -6,7 +6,6 @@ package docs.stream.io import java.io.File import akka.stream._ -import akka.stream.io.IOResult import akka.stream.scaladsl.{ FileIO, Sink, Source } import akka.stream.testkit.Utils._ import akka.stream.testkit._ diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 5ba40fb2f3..a990a72279 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -102,7 +102,7 @@ about successful state changes by publishing events. When persisting events with ``persist`` it is guaranteed that the persistent actor will not receive further commands between the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` -calls in context of a single command. Incoming messages are :ref:`stashed ` until the ``persist`` +calls in context of a single command. Incoming messages are :ref:`stashed ` until the ``persist`` is completed. If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default), @@ -175,7 +175,7 @@ and before any other received messages. If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure`` is called (logging the error by default) and the actor will be stopped. -.. _internal-stash: +.. _internal-stash_scala: Internal stash -------------- diff --git a/akka-docs/rst/scala/stream/stream-quickstart.rst b/akka-docs/rst/scala/stream/stream-quickstart.rst index 23624a6e4a..5f26b85179 100644 --- a/akka-docs/rst/scala/stream/stream-quickstart.rst +++ b/akka-docs/rst/scala/stream/stream-quickstart.rst @@ -1,7 +1,141 @@ .. _stream-quickstart-scala: -Quick Start Guide: Reactive Tweets -================================== +Quick Start Guide +================= + +A stream usually begins at a source, so this is also how we start an Akka +Stream. Before we create one, we import the full complement of streaming tools: + +.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#imports + +Now we will start with a rather simple source, emitting the integers 1 to 100: + +.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#create-source + +The :class:`Source` type is parameterized with two types: the first one is the +type of element that this source emits and the second one may signal that +running the source produces some auxiliary value (e.g. a network source may +provide information about the bound port or the peer’s address). Where no +auxiliary information is produced, the type ``akka.NotUsed`` is used—and a +simple range of integers surely falls into this category. + +Having created this source means that we have a description of how to emit the +first 100 natural numbers, but this source is not yet active. In order to get +those numbers out we have to run it: + +.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#run-source + +This line will complement the source with a consumer function—in this example +we simply print out the numbers to the console—and pass this little stream +setup to an Actor that runs it. This activation is signaled by having “run” be +part of the method name; there are other methods that run Akka Streams, and +they all follow this pattern. + +You may wonder where the Actor gets created that runs the stream, and you are +probably also asking yourself what this ``materializer`` means. In order to get +this value we first need to create an Actor system: + +.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#create-materializer + +There are other ways to create a materializer, e.g. from an +:class:`ActorContext` when using streams from within Actors. The +:class:`Materializer` is a factory for stream execution engines, it is the +thing that makes streams run—you don’t need to worry about any of the details +just now apart from that you need one for calling any of the ``run`` methods on +a :class:`Source`. The materializer is picked up implicitly if it is omitted +from the ``run`` method call arguments, which we will do in the following. + +The nice thing about Akka Streams is that the :class:`Source` is just a +description of what you want to run, and like an architect’s blueprint it can +be reused, incorporated into a larger design. We may choose to transform the +source of integers and write it to a file instead: + +.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#transform-source + +First we use the ``scan`` combinator to run a computation over the whole +stream: starting with the number 1 (``BigInt(1)``) we multiple by each of +the incoming numbers, one after the other; the scan operation emits the initial +value and then every calculation result. This yields the series of factorial +numbers which we stash away as a :class:`Source` for later reuse—it is +important to keep in mind that nothing is actually computed yet, this is just a +description of what we want to have computed once we run the stream. Then we +convert the resulting series of numbers into a stream of :class:`ByteString` +objects describing lines in a text file. This stream is then run by attaching a +file as the receiver of the data. In the terminology of Akka Streams this is +called a :class:`Sink`. :class:`IOResult` is a type that IO operations return in +Akka Streams in order to tell you how many bytes or elements were consumed and +whether the stream terminated normally or exceptionally. + +Reusable Pieces +--------------- + +One of the nice parts of Akka Streams—and something that other stream libraries +do not offer—is that not only sources can be reused like blueprints, all other +elements can be as well. We can take the file-writing :class:`Sink`, prepend +the processing steps necessary to get the :class:`ByteString` elements from +incoming strings and package that up as a reusable piece as well. Since the +language for writing these streams always flows from left to right (just like +plain English), we need a starting point that is like a source but with an +“open” input. In Akka Streams this is called a :class:`Flow`: + +.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#transform-sink + +Starting from a flow of strings we convert each to :class:`ByteString` and then +feed to the already known file-writing :class:`Sink`. The resulting blueprint +is a :class:`Sink[String, Future[IOResult]]`, which means that it +accepts strings as its input and when materialized it will create auxiliary +information of type ``Future[IOResult]`` (when chaining operations on +a :class:`Source` or :class:`Flow` the type of the auxiliary information—called +the “materialized value”—is given by the leftmost starting point; since we want +to retain what the ``FileIO.toFile`` sink has to offer, we need to say +``Keep.right``). + +We can use the new and shiny :class:`Sink` we just created by +attaching it to our ``factorials`` source—after a small adaptation to turn the +numbers into strings: + +.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#use-transformed-sink + +Time-Based Processing +--------------------- + +Before we start looking at a more involved example we explore the streaming +nature of what Akka Streams can do. Starting from the ``factorials`` source +we transform the stream by zipping it together with another stream, +represented by a :class:`Source` that emits the number 0 to 100: the first +number emitted by the ``factorials`` source is the factorial of zero, the +second is the factorial of one, and so on. We combine these two by forming +strings like ``"3! = 6"``. + +.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#add-streams + +All operations so far have been time-independent and could have been performed +in the same fashion on strict collections of elements. The next line +demonstrates that we are in fact dealing with streams that can flow at a +certain speed: we use the ``throttle`` combinator to slow down the stream to 1 +element per second (the second ``1`` in the argument list is the maximum size +of a burst that we want to allow—passing ``1`` means that the first element +gets through immediately and the second then has to wait for one second and so +on). + +If you run this program you will see one line printed per second. One aspect +that is not immediately visible deserves mention, though: if you try and set +the streams to produce a billion numbers each then you will notice that your +JVM does not crash with an OutOfMemoryError, even though you will also notice +that running the streams happens in the background, asynchronously (this is the +reason for the auxiliary information to be provided as a :class:`Future`). The +secret that makes this work is that Akka Streams implicitly implement pervasive +flow control, all combinators respect back-pressure. This allows the throttle +combinator to signal to all its upstream sources of data that it can only +accept elements at a certain rate—when the incoming rate is higher than one per +second the throttle combinator will assert *back-pressure* upstream. + +This is basically all there is to Akka Streams in a nutshell—glossing over the +fact that there are dozens of sources and sinks and many more stream +transformation combinators to choose from, see also :ref:`stages-overview_scala`. + +Reactive Tweets +=============== A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them. diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala index 18e7f90380..3bda28bc7c 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala @@ -1,6 +1,6 @@ /** - * Copyright (C) 2009-2016 Typesafe Inc. - */ + * Copyright (C) 2009-2016 Typesafe Inc. + */ package akka.persistence diff --git a/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSinkTest.java b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSinkTest.java index ed78e513a4..9209f62e3b 100644 --- a/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSinkTest.java @@ -3,6 +3,7 @@ */ package akka.stream.io; +import akka.stream.IOResult; import akka.stream.StreamTest; import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.stream.javadsl.Source; diff --git a/akka-stream/src/main/scala/akka/stream/io/IOResult.scala b/akka-stream/src/main/scala/akka/stream/IOResult.scala similarity index 97% rename from akka-stream/src/main/scala/akka/stream/io/IOResult.scala rename to akka-stream/src/main/scala/akka/stream/IOResult.scala index c65e2d4e32..ffaacd3f5a 100644 --- a/akka-stream/src/main/scala/akka/stream/io/IOResult.scala +++ b/akka-stream/src/main/scala/akka/stream/IOResult.scala @@ -1,7 +1,7 @@ /** * Copyright (C) 2016 Typesafe Inc. */ -package akka.stream.io +package akka.stream import akka.Done import scala.util.{ Failure, Success, Try } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala index 1fa33ab7fe..8d7d9cf21b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala @@ -10,7 +10,7 @@ import java.nio.channels.FileChannel import akka.Done import akka.actor.{ Deploy, ActorLogging, DeadLetterSuppression, Props } import akka.stream.actor.ActorPublisherMessage -import akka.stream.io.IOResult +import akka.stream.IOResult import akka.util.ByteString import scala.annotation.tailrec diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala index 97c232be32..dbe212ed3a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala @@ -9,7 +9,7 @@ import java.nio.file.StandardOpenOption import akka.Done import akka.actor.{ Deploy, ActorLogging, Props } -import akka.stream.io.IOResult +import akka.stream.IOResult import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } import akka.util.ByteString diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index c8332fdf51..811229d7d8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -5,7 +5,7 @@ package akka.stream.impl.io import java.io.{ File, OutputStream } import java.nio.file.StandardOpenOption -import akka.stream.io.IOResult +import akka.stream.IOResult import akka.stream.impl.SinkModule import akka.stream.impl.StreamLayout.Module import akka.stream.impl.Stages.DefaultAttributes.IODispatcher diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index b116c84da3..0e97a95478 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -7,7 +7,7 @@ import java.io.{ File, InputStream } import akka.stream._ import akka.stream.ActorAttributes.Dispatcher -import akka.stream.io.IOResult +import akka.stream.IOResult import akka.stream.impl.StreamLayout.Module import akka.stream.impl.Stages.DefaultAttributes.IODispatcher import akka.stream.impl.{ ErrorPublisher, SourceModule } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala index c4e93264a2..db3bf6bf61 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala @@ -9,7 +9,7 @@ import akka.Done import akka.actor.{ Deploy, ActorLogging, DeadLetterSuppression, Props } import akka.io.DirectByteBufferPool import akka.stream.actor.ActorPublisherMessage -import akka.stream.io.IOResult +import akka.stream.IOResult import akka.util.ByteString import akka.util.ByteString.ByteString1C diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala index aad2c10caf..fe5712dc08 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala @@ -8,7 +8,7 @@ import java.io.OutputStream import akka.Done import akka.actor.{ Deploy, ActorLogging, Props } import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } -import akka.stream.io.IOResult +import akka.stream.IOResult import akka.util.ByteString import scala.concurrent.Promise diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala index e477df84d9..560134ca00 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala @@ -8,7 +8,7 @@ import java.nio.file.StandardOpenOption import java.nio.file.StandardOpenOption._ import java.util import akka.stream.{ scaladsl, javadsl, ActorAttributes } -import akka.stream.io.IOResult +import akka.stream.IOResult import akka.util.ByteString import java.util.concurrent.CompletionStage diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala index fa9009636a..6820c2a9a6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -6,7 +6,7 @@ package akka.stream.javadsl import java.io.{ InputStream, OutputStream } import akka.japi.function import akka.stream.{ scaladsl, javadsl, ActorAttributes } -import akka.stream.io.IOResult +import akka.stream.IOResult import akka.util.ByteString import scala.concurrent.duration.FiniteDuration import java.util.concurrent.CompletionStage diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala index 636bbcec97..f699e768c7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala @@ -10,7 +10,7 @@ import java.nio.file.StandardOpenOption._ import akka.stream.ActorAttributes import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.io._ -import akka.stream.io.IOResult +import akka.stream.IOResult import akka.util.ByteString import scala.concurrent.Future diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index 39bb32f240..1374faa629 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -6,7 +6,7 @@ package akka.stream.scaladsl import java.io.{ OutputStream, InputStream } import akka.stream.ActorAttributes -import akka.stream.io.IOResult +import akka.stream.IOResult import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.io.{ InputStreamSinkStage, OutputStreamSink, OutputStreamSourceStage, InputStreamSource } import akka.util.ByteString