diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeSeq.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeSeq.scala new file mode 100644 index 0000000000..49a27c9800 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeSeq.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2015 Typesafe + */ +package docs.stream.cookbook + +import akka.stream.scaladsl.{ Sink, Source } + +import scala.collection.immutable +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ + +class RecipeSeq extends RecipeSpec { + + "Recipe for draining a stream into a strict collection" must { + + "work" in { + //#draining-to-seq-unsafe + val result = immutable.Seq[Message]("1", "2", "3") + val myData = Source(result) + + val unsafe: Future[Seq[Message]] = myData.runWith(Sink.seq) // dangerous! + //#draining-to-seq-unsafe + + Await.result(unsafe, 3.seconds) should be(result) + } + + "work together with limit(n)" in { + //#draining-to-seq-safe + val result = List("1", "2", "3") + val myData = Source(result) + val max = 100 + + // OK. Future will fail with a `StreamLimitReachedException` + // if the number of incoming elements is larger than max + val safe1: Future[immutable.Seq[Message]] = myData.limit(max).runWith(Sink.seq) + //#draining-to-seq-safe + + Await.result(safe1, 3.seconds) should be(result) + } + + "work together with take(n)" in { + val result = List("1", "2", "3") + val myData = Source(result) + val max = 100 + + //#draining-to-seq-safe + // OK. Collect up until max-th elements only, then cancel upstream + val safe2: Future[immutable.Seq[Message]] = myData.take(max).runWith(Sink.seq) + //#draining-to-seq-safe + + Await.result(safe2, 3.seconds) should be(result) + } + } + +} diff --git a/akka-docs/rst/java/code/docs/stream/FlowStagesDocTest.java b/akka-docs/rst/java/code/docs/stream/FlowStagesDocTest.java index f486e4e420..bfcb21d29a 100644 --- a/akka-docs/rst/java/code/docs/stream/FlowStagesDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/FlowStagesDocTest.java @@ -156,7 +156,7 @@ public class FlowStagesDocTest { @Test public void demonstrateVariousPushPullStages() throws Exception { final Sink>> sink = - Flow.of(Integer.class).grouped(10).toMat(Sink.head(), Keep.right()); + Flow.of(Integer.class).limit(10).toMat(Sink.seq(), Keep.right()); //#stage-chain final RunnableGraph>> runnable = diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeByteStrings.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeByteStrings.java index 14b278fea7..9232d21e42 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeByteStrings.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeByteStrings.java @@ -95,7 +95,7 @@ public class RecipeByteStrings extends RecipeTest { rawBytes.transform(() -> new Chunker(CHUNK_LIMIT)); //#bytestring-chunker2 - CompletionStage> chunksFuture = chunksStream.grouped(10).runWith(Sink.head(), mat); + CompletionStage> chunksFuture = chunksStream.limit(10).runWith(Sink.seq(), mat); List chunks = chunksFuture.toCompletableFuture().get(3, TimeUnit.SECONDS); @@ -157,7 +157,7 @@ public class RecipeByteStrings extends RecipeTest { ByteString.fromArray(new byte[] { 4, 5, 6 }), ByteString.fromArray(new byte[] { 7, 8, 9, 10 }))); - List got = bytes1.via(limiter).grouped(10).runWith(Sink.head(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS); + List got = bytes1.via(limiter).limit(10).runWith(Sink.seq(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS); ByteString acc = ByteString.empty(); for (ByteString b : got) { acc = acc.concat(b); @@ -166,7 +166,7 @@ public class RecipeByteStrings extends RecipeTest { boolean thrown = false; try { - bytes2.via(limiter).grouped(10).runWith(Sink.head(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS); + bytes2.via(limiter).limit(10).runWith(Sink.seq(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS); } catch (IllegalStateException ex) { thrown = true; } @@ -190,7 +190,7 @@ public class RecipeByteStrings extends RecipeTest { Source compacted = rawBytes.map(bs -> bs.compact()); //#compacting-bytestrings - List got = compacted.grouped(10).runWith(Sink.head(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS); + List got = compacted.limit(10).runWith(Sink.seq(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS); for (ByteString byteString : got) { assertTrue(byteString.isCompact()); diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeFlattenList.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeFlattenList.java index c04ed24cb7..de2ae6ec15 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeFlattenList.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeFlattenList.java @@ -48,7 +48,7 @@ public class RecipeFlattenList extends RecipeTest { Source flattened = myData.mapConcat(i -> i); //#flattening-lists - List got = flattened.grouped(10).runWith(Sink.head(), mat).toCompletableFuture().get(1, TimeUnit.SECONDS); + List got = flattened.limit(10).runWith(Sink.seq(), mat).toCompletableFuture().get(1, TimeUnit.SECONDS); assertEquals(got.get(0), new Message("1")); assertEquals(got.get(1), new Message("2")); assertEquals(got.get(2), new Message("3")); diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeParseLines.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeParseLines.java index ec502e6506..a25b62ef9a 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeParseLines.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeParseLines.java @@ -50,8 +50,7 @@ public class RecipeParseLines extends RecipeTest { .via(Framing.delimiter(ByteString.fromString("\r\n"), 100, true)) .map(b -> b.utf8String()); //#parse-lines - - lines.grouped(10).runWith(Sink.head(), mat).toCompletableFuture().get(1, TimeUnit.SECONDS); + lines.limit(10).runWith(Sink.seq(), mat).toCompletableFuture().get(1, TimeUnit.SECONDS); } } diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSeq.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSeq.java new file mode 100644 index 0000000000..0e66e463d3 --- /dev/null +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSeq.java @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2015-2016 Typesafe + */ +package docs.stream.javadsl.cookbook; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.testkit.JavaTestKit; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +public class RecipeSeq extends RecipeTest { + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("RecipeLoggingElements"); + } + + @AfterClass + public static void tearDown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + final Materializer mat = ActorMaterializer.create(system); + + @Test + public void drainSourceToList() throws Exception { + new JavaTestKit(system) { + { + //#draining-to-list-unsafe + final Source myData = Source.from(Arrays.asList("1", "2", "3")); + final int MAX_ALLOWED_SIZE = 100; + + final CompletionStage> strings = myData.runWith(Sink.seq(), mat); // dangerous! + //#draining-to-list-unsafe + + strings.toCompletableFuture().get(3, TimeUnit.SECONDS); + } + }; + } + + @Test + public void drainSourceToListWithLimit() throws Exception { + new JavaTestKit(system) { + { + //#draining-to-list-safe + final Source myData = Source.from(Arrays.asList("1", "2", "3")); + final int MAX_ALLOWED_SIZE = 100; + + // OK. Future will fail with a `StreamLimitReachedException` + // if the number of incoming elements is larger than max + final CompletionStage> strings = + myData.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat); + //#draining-to-list-safe + + strings.toCompletableFuture().get(1, TimeUnit.SECONDS); + } + }; + } + + public void drainSourceToListWithTake() throws Exception { + new JavaTestKit(system) { + { + final Source myData = Source.from(Arrays.asList("1", "2", "3")); + final int MAX_ALLOWED_SIZE = 100; + + //#draining-to-list-safe + // OK. Collect up until max-th elements only, then cancel upstream + final CompletionStage> strings = + myData.take(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat); + //#draining-to-list-safe + + strings.toCompletableFuture().get(1, TimeUnit.SECONDS); + } + }; + } +} diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeToStrict.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeToStrict.java deleted file mode 100644 index c7e7fde861..0000000000 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeToStrict.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright (C) 2015-2016 Typesafe - */ -package docs.stream.javadsl.cookbook; - -import akka.NotUsed; -import akka.actor.ActorSystem; -import akka.stream.ActorMaterializer; -import akka.stream.Materializer; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.testkit.JavaTestKit; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; - -public class RecipeToStrict extends RecipeTest { - static ActorSystem system; - - @BeforeClass - public static void setup() { - system = ActorSystem.create("RecipeLoggingElements"); - } - - @AfterClass - public static void tearDown() { - JavaTestKit.shutdownActorSystem(system); - system = null; - } - - final Materializer mat = ActorMaterializer.create(system); - - @Test - public void workWithPrintln() throws Exception { - new JavaTestKit(system) { - { - final Source myData = Source.from(Arrays.asList("1", "2", "3")); - final int MAX_ALLOWED_SIZE = 100; - - //#draining-to-list - final CompletionStage> strings = myData - .grouped(MAX_ALLOWED_SIZE).runWith(Sink.head(), mat); - //#draining-to-list - - strings.toCompletableFuture().get(3, TimeUnit.SECONDS); - } - }; - } - -} diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java index 5ff7cb61ae..c90a756ad2 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java @@ -73,7 +73,8 @@ public class RecipeWorkerPool extends RecipeTest { Source processedJobs = data.via(balancer); //#worker-pool2 - CompletionStage> future = processedJobs.map(m -> m.msg).grouped(10).runWith(Sink.head(), mat); + FiniteDuration timeout = FiniteDuration.create(200, TimeUnit.MILLISECONDS); + CompletionStage> future = processedJobs.map(m -> m.msg).limit(10).runWith(Sink.seq(), mat); List got = future.toCompletableFuture().get(1, TimeUnit.SECONDS); assertTrue(got.contains("1 done")); assertTrue(got.contains("2 done")); diff --git a/akka-docs/rst/java/stream/stream-cookbook.rst b/akka-docs/rst/java/stream/stream-cookbook.rst index 5bfae50759..0fb3a475a8 100644 --- a/akka-docs/rst/java/stream/stream-cookbook.rst +++ b/akka-docs/rst/java/stream/stream-cookbook.rst @@ -54,16 +54,20 @@ collection itself, so we can just call ``mapConcat(l -> l)``. Draining a stream to a strict collection ---------------------------------------- -**Situation:** A finite sequence of elements is given as a stream, but a Scala collection is needed instead. +**Situation:** A possibly unbounded sequence of elements is given as a stream, which needs to be collected into a Scala collection while ensuring boundedness -In this recipe we will use the ``grouped`` stream operation that groups incoming elements into a stream of limited -size collections (it can be seen as the almost opposite version of the "Flattening a stream of sequences" recipe -we showed before). By using a ``grouped(MAX_ALLOWED_SIZE)`` we create a stream of groups -with maximum size of ``MaxAllowedSeqSize`` and then we take the first element of this stream by attaching a ``Sink.head()``. What we get is a -:class:`CompletionStage` containing a sequence with all the elements of the original up to ``MAX_ALLOWED_SIZE`` size (further -elements are dropped). +A common situation when working with streams is one where we need to collect incoming elements into a Scala collection. +This operation is supported via ``Sink.seq`` which materializes into a ``CompletionStage>``. -.. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeToStrict.java#draining-to-list +The function ``limit`` or ``take`` should always be used in conjunction in order to guarantee stream boundedness, thus preventing the program from running out of memory. + +For example, this is best avoided: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/cookbook/RecipeSeq.java#draining-to-list-unsafe + +Rather, use ``limit`` or ``take`` to ensure that the resulting ``List`` will contain only up to ``MAX_ALLOWED_SIZE`` elements: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/cookbook/RecipeSeq.java#draining-to-list-safe Calculating the digest of a ByteString stream --------------------------------------------- diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/HttpClientExampleSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/HttpClientExampleSpec.scala index 110711cac6..b2208a6a8c 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/HttpClientExampleSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/HttpClientExampleSpec.scala @@ -46,7 +46,6 @@ class HttpClientExampleSpec extends WordSpec with Matchers { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() - // construct a pool client flow with context type `Int` val poolClientFlow = Http().cachedHostConnectionPool[Int]("akka.io") val responseFuture: Future[(Try[HttpResponse], Int)] = diff --git a/akka-docs/rst/scala/code/docs/stream/BidiFlowDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/BidiFlowDocSpec.scala index e830fb91d6..2182a5b190 100644 --- a/akka-docs/rst/scala/code/docs/stream/BidiFlowDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/BidiFlowDocSpec.scala @@ -152,7 +152,7 @@ class BidiFlowDocSpec extends AkkaSpec with ConversionCheckedTripleEquals { // test it by plugging it into its own inverse and closing the right end val pingpong = Flow[Message].collect { case Ping(id) => Pong(id) } val flow = stack.atop(stack.reversed).join(pingpong) - val result = Source((0 to 9).map(Ping)).via(flow).grouped(20).runWith(Sink.head) + val result = Source((0 to 9).map(Ping)).via(flow).limit(20).runWith(Sink.seq) Await.result(result, 1.second) should ===((0 to 9).map(Pong)) //#compose } @@ -160,14 +160,14 @@ class BidiFlowDocSpec extends AkkaSpec with ConversionCheckedTripleEquals { "work when chopped up" in { val stack = codec.atop(framing) val flow = stack.atop(chopUp).atop(stack.reversed).join(Flow[Message].map { case Ping(id) => Pong(id) }) - val f = Source((0 to 9).map(Ping)).via(flow).grouped(20).runWith(Sink.head) + val f = Source((0 to 9).map(Ping)).via(flow).limit(20).runWith(Sink.seq) Await.result(f, 1.second) should ===((0 to 9).map(Pong)) } "work when accumulated" in { val stack = codec.atop(framing) val flow = stack.atop(accumulate).atop(stack.reversed).join(Flow[Message].map { case Ping(id) => Pong(id) }) - val f = Source((0 to 9).map(Ping)).via(flow).grouped(20).runWith(Sink.head) + val f = Source((0 to 9).map(Ping)).via(flow).limit(20).runWith(Sink.seq) Await.result(f, 1.second) should ===((0 to 9).map(Pong)) } diff --git a/akka-docs/rst/scala/code/docs/stream/FlowErrorDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/FlowErrorDocSpec.scala index 888b2182d2..a1c820c72e 100644 --- a/akka-docs/rst/scala/code/docs/stream/FlowErrorDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/FlowErrorDocSpec.scala @@ -80,7 +80,7 @@ class FlowErrorDocSpec extends AkkaSpec { } .withAttributes(ActorAttributes.supervisionStrategy(decider)) val source = Source(List(1, 3, -1, 5, 7)).via(flow) - val result = source.grouped(1000).runWith(Sink.head) + val result = source.limit(1000).runWith(Sink.seq) // the negative element cause the scan stage to be restarted, // i.e. start from 0 again // result here will be a Future completed with Success(Vector(0, 1, 4, 0, 5, 12)) diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala index efee481c4c..28248ac255 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala @@ -48,7 +48,7 @@ class RecipeByteStrings extends RecipeSpec { val chunksStream = rawBytes.transform(() => new Chunker(ChunkLimit)) //#bytestring-chunker - val chunksFuture = chunksStream.grouped(10).runWith(Sink.head) + val chunksFuture = chunksStream.limit(10).runWith(Sink.seq) val chunks = Await.result(chunksFuture, 3.seconds) @@ -77,11 +77,11 @@ class RecipeByteStrings extends RecipeSpec { val bytes1 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9))) val bytes2 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9, 10))) - Await.result(bytes1.via(limiter).grouped(10).runWith(Sink.head), 3.seconds) + Await.result(bytes1.via(limiter).limit(10).runWith(Sink.seq), 3.seconds) .fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9)) an[IllegalStateException] must be thrownBy { - Await.result(bytes2.via(limiter).grouped(10).runWith(Sink.head), 3.seconds) + Await.result(bytes2.via(limiter).limit(10).runWith(Sink.seq), 3.seconds) } } @@ -93,7 +93,7 @@ class RecipeByteStrings extends RecipeSpec { val compacted: Source[ByteString, NotUsed] = data.map(_.compact) //#compacting-bytestrings - Await.result(compacted.grouped(10).runWith(Sink.head), 3.seconds).forall(_.isCompact) should be(true) + Await.result(compacted.limit(10).runWith(Sink.seq), 3.seconds).forall(_.isCompact) should be(true) } } diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeFlattenSeq.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeFlattenSeq.scala index 5ffc2e467e..45f48b9bd1 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeFlattenSeq.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeFlattenSeq.scala @@ -20,7 +20,7 @@ class RecipeFlattenSeq extends RecipeSpec { val flattened: Source[Message, NotUsed] = myData.mapConcat(identity) //#flattening-seqs - Await.result(flattened.grouped(8).runWith(Sink.head), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7")) + Await.result(flattened.limit(8).runWith(Sink.seq), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7")) } diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala index bd9163684f..76f1dc667d 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala @@ -45,8 +45,8 @@ class RecipeMultiGroupBy extends RecipeSpec { .grouped(10) .mergeSubstreams .map(g => g.head._2.name + g.map(_._1).mkString("[", ", ", "]")) - .grouped(10) - .runWith(Sink.head) + .limit(10) + .runWith(Sink.seq) Await.result(result, 3.seconds).toSet should be(Set( "1[1: a, 1: b, all: c, all: d, 1: e]", diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala index 2e66210920..209553fc92 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala @@ -27,7 +27,7 @@ class RecipeParseLines extends RecipeSpec { .map(_.utf8String) //#parse-lines - Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List( + Await.result(linesStream.limit(10).runWith(Sink.seq), 3.seconds) should be(List( "Hello World\r!", "Hello Akka!", "Hello Streams!", diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala index 494303fb32..15c07825e9 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala @@ -29,7 +29,7 @@ class RecipeReduceByKey extends RecipeSpec { .mergeSubstreams //#word-count - Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( + Await.result(counts.limit(10).runWith(Sink.seq), 3.seconds).toSet should be(Set( ("hello", 2), ("world", 1), ("and", 1), @@ -61,7 +61,7 @@ class RecipeReduceByKey extends RecipeSpec { map = (word: String) => 1)((left: Int, right: Int) => left + right)) //#reduce-by-key-general - Await.result(wordCounts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( + Await.result(wordCounts.limit(10).runWith(Sink.seq), 3.seconds).toSet should be(Set( ("hello", 2), ("world", 1), ("and", 1), diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeToStrict.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeToStrict.scala deleted file mode 100644 index a73c581fc3..0000000000 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeToStrict.scala +++ /dev/null @@ -1,27 +0,0 @@ -package docs.stream.cookbook - -import akka.stream.scaladsl.{ Sink, Source } - -import scala.collection.immutable -import scala.concurrent.{ Await, Future } -import scala.concurrent.duration._ - -class RecipeToStrict extends RecipeSpec { - - "Recipe for draining a stream into a strict collection" must { - - "work" in { - val myData = Source(List("1", "2", "3")) - val MaxAllowedSeqSize = 100 - - //#draining-to-seq - val strict: Future[immutable.Seq[Message]] = - myData.grouped(MaxAllowedSeqSize).runWith(Sink.head) - //#draining-to-seq - - Await.result(strict, 3.seconds) should be(List("1", "2", "3")) - } - - } - -} diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala index ee3d08fafc..afaae9a438 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala @@ -39,7 +39,7 @@ class RecipeWorkerPool extends RecipeSpec { val processedJobs: Source[Result, NotUsed] = myJobs.via(balancer(worker, 3)) //#worker-pool - Await.result(processedJobs.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( + Await.result(processedJobs.limit(10).runWith(Sink.seq), 3.seconds).toSet should be(Set( "1 done", "2 done", "3 done", "4 done", "5 done")) } diff --git a/akka-docs/rst/scala/stream/stream-cookbook.rst b/akka-docs/rst/scala/stream/stream-cookbook.rst index 262cdbf7ac..12a22a1de7 100644 --- a/akka-docs/rst/scala/stream/stream-cookbook.rst +++ b/akka-docs/rst/scala/stream/stream-cookbook.rst @@ -54,16 +54,20 @@ collection itself, so we can just call ``mapConcat(identity)``. Draining a stream to a strict collection ---------------------------------------- -**Situation:** A finite sequence of elements is given as a stream, but a Scala collection is needed instead. +**Situation:** A possibly unbounded sequence of elements is given as a stream, which needs to be collected into a Scala collection while ensuring boundedness -In this recipe we will use the ``grouped`` stream operation that groups incoming elements into a stream of limited -size collections (it can be seen as the almost opposite version of the "Flattening a stream of sequences" recipe -we showed before). By using a ``grouped(MaxAllowedSeqSize)`` we create a stream of groups -with maximum size of ``MaxAllowedSeqSize`` and then we take the first element of this stream by attaching a ``Sink.head``. What we get is a -:class:`Future` containing a sequence with all the elements of the original up to ``MaxAllowedSeqSize`` size (further -elements are dropped). +A common situation when working with streams is one where we need to collect incoming elements into a Scala collection. +This operation is supported via ``Sink.seq`` which materializes into a ``Future[Seq[T]]``. -.. includecode:: ../code/docs/stream/cookbook/RecipeToStrict.scala#draining-to-seq +The function ``limit`` or ``take`` should always be used in conjunction in order to guarantee stream boundedness, thus preventing the program from running out of memory. + +For example, this is best avoided: + +.. includecode:: code/docs/stream/cookbook/RecipeSeq.scala#draining-to-seq-unsafe + +Rather, use ``limit`` or ``take`` to ensure that the resulting ``Seq`` will contain only up to ``max`` elements: + +.. includecode:: code/docs/stream/cookbook/RecipeSeq.scala#draining-to-seq-safe Calculating the digest of a ByteString stream --------------------------------------------- diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala index c1bb085683..dbb0cef368 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala @@ -252,7 +252,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val parser = newParser val result = multiParse(newParser)(Seq(prep(start + manyChunks))) val HttpEntity.Chunked(_, chunks) = result.head.right.get.req.entity - val strictChunks = chunks.grouped(100000).runWith(Sink.head).awaitResult(awaitAtMost) + val strictChunks = chunks.limit(100000).runWith(Sink.seq).awaitResult(awaitAtMost) strictChunks.size shouldEqual numChunks } } @@ -322,7 +322,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { "too-large chunk size" in new Test { Seq(start, """1a2b3c4d5e - |""") should generalMultiParseTo(Right(baseRequest), + |""") should generalMultiParseTo(Right(baseRequest), Left(EntityStreamError(ErrorInfo("HTTP chunk size exceeds the configured limit of 1048576 bytes")))) closeAfterResponseCompletion shouldEqual Seq(false) } @@ -360,10 +360,10 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { "two Content-Length headers" in new Test { """GET / HTTP/1.1 - |Content-Length: 3 - |Content-Length: 4 - | - |foo""" should parseToError(BadRequest, + |Content-Length: 3 + |Content-Length: 4 + | + |foo""" should parseToError(BadRequest, ErrorInfo("HTTP message must not contain more than one Content-Length header")) } @@ -374,63 +374,63 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { "HTTP version 1.2" in new Test { """GET / HTTP/1.2 - |""" should parseToError(HTTPVersionNotSupported, + |""" should parseToError(HTTPVersionNotSupported, ErrorInfo("The server does not support the HTTP protocol version used in the request.")) } "with an illegal char in a header name" in new Test { """GET / HTTP/1.1 - |User@Agent: curl/7.19.7""" should parseToError(BadRequest, ErrorInfo("Illegal character '@' in header name")) + |User@Agent: curl/7.19.7""" should parseToError(BadRequest, ErrorInfo("Illegal character '@' in header name")) } "with a too-long header name" in new Test { """|GET / HTTP/1.1 - |UserxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxAgent: curl/7.19.7""" should parseToError( + |UserxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxAgent: curl/7.19.7""" should parseToError( BadRequest, ErrorInfo("HTTP header name exceeds the configured limit of 64 characters")) } "with a too-long header-value" in new Test { """|GET / HTTP/1.1 - |Fancy: 123456789012345678901234567890123""" should parseToError(BadRequest, + |Fancy: 123456789012345678901234567890123""" should parseToError(BadRequest, ErrorInfo("HTTP header value exceeds the configured limit of 32 characters")) } "with an invalid Content-Length header value" in new Test { """GET / HTTP/1.0 - |Content-Length: 1.5 - | - |abc""" should parseToError(BadRequest, ErrorInfo("Illegal `Content-Length` header value")) + |Content-Length: 1.5 + | + |abc""" should parseToError(BadRequest, ErrorInfo("Illegal `Content-Length` header value")) } "with Content-Length > Long.MaxSize" in new Test { // content-length = (Long.MaxValue + 1) * 10, which is 0 when calculated overflow """PUT /resource/yes HTTP/1.1 - |Content-length: 92233720368547758080 - |Host: x - | - |""" should parseToError(400: StatusCode, ErrorInfo("`Content-Length` header value must not exceed 63-bit integer range")) + |Content-length: 92233720368547758080 + |Host: x + | + |""" should parseToError(400: StatusCode, ErrorInfo("`Content-Length` header value must not exceed 63-bit integer range")) } "with an illegal entity using CONNECT" in new Test { """CONNECT /resource/yes HTTP/1.1 - |Transfer-Encoding: chunked - |Host: x - | - |""" should parseToError(422: StatusCode, ErrorInfo("CONNECT requests must not have an entity")) + |Transfer-Encoding: chunked + |Host: x + | + |""" should parseToError(422: StatusCode, ErrorInfo("CONNECT requests must not have an entity")) } "with an illegal entity using HEAD" in new Test { """HEAD /resource/yes HTTP/1.1 - |Content-length: 3 - |Host: x - | - |foo""" should parseToError(422: StatusCode, ErrorInfo("HEAD requests must not have an entity")) + |Content-length: 3 + |Host: x + | + |foo""" should parseToError(422: StatusCode, ErrorInfo("HEAD requests must not have an entity")) } "with an illegal entity using TRACE" in new Test { """TRACE /resource/yes HTTP/1.1 - |Transfer-Encoding: chunked - |Host: x - | - |""" should parseToError(422: StatusCode, ErrorInfo("TRACE requests must not have an entity")) + |Transfer-Encoding: chunked + |Host: x + | + |""" should parseToError(422: StatusCode, ErrorInfo("TRACE requests must not have an entity")) } } } @@ -504,7 +504,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } } .map(strictEqualify) - .grouped(100000).runWith(Sink.head) + .limit(100000).runWith(Sink.seq) .awaitResult(awaitAtMost) protected def parserSettings: ParserSettings = ParserSettings(system) @@ -517,7 +517,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } private def compactEntityChunks(data: Source[ChunkStreamPart, Any]): Future[Seq[ChunkStreamPart]] = - data.grouped(100000).runWith(Sink.head) + data.limit(100000).runWith(Sink.seq) .fast.recover { case _: NoSuchElementException ⇒ Nil } def prep(response: String) = response.stripMarginWithNewline("\r\n") diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala index ed0362fa71..dbcc470240 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala @@ -306,7 +306,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { }.concatSubstreams def collectBlocking[T](source: Source[T, Any]): Seq[T] = - Await.result(source.grouped(100000).runWith(Sink.head), 500.millis) + Await.result(source.limit(100000).runWith(Sink.seq), 500.millis) protected def parserSettings: ParserSettings = ParserSettings(system) @@ -323,7 +323,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } private def compactEntityChunks(data: Source[ChunkStreamPart, Any]): Future[Source[ChunkStreamPart, Any]] = - data.grouped(100000).runWith(Sink.head) + data.limit(100000).runWith(Sink.seq) .fast.map(source(_: _*)) .fast.recover { case _: NoSuchElementException ⇒ source() } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/RequestRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/RequestRendererSpec.scala index 9129624057..1760a365e0 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/RequestRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/RequestRendererSpec.scala @@ -325,7 +325,7 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll def renderTo(expected: String): Matcher[HttpRequest] = equal(expected.stripMarginWithNewline("\r\n")).matcher[String] compose { request ⇒ val byteStringSource = renderToSource(RequestRenderingContext(request, Host(serverAddress))) - val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String) + val future = byteStringSource.limit(1000).runWith(Sink.seq).map(_.reduceLeft(_ ++ _).utf8String) Await.result(future, 250.millis) } } diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index d8be965b22..9253d622bc 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -394,7 +394,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit private val HttpRequest(POST, uri, List(Accept(Seq(MediaRanges.`*/*`)), Host(_, _), `User-Agent`(_)), Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() uri shouldEqual Uri(s"http://$hostname:$port/chunked") - Await.result(chunkStream.grouped(5).runWith(Sink.head), 100.millis) shouldEqual chunks + Await.result(chunkStream.limit(5).runWith(Sink.seq), 100.millis) shouldEqual chunks val serverOutSub = serverOut.expectSubscription() serverOutSub.expectRequest() @@ -404,7 +404,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit clientInSub.request(1) val HttpResponse(StatusCodes.PartialContent, List(Age(42), Server(_), Date(_)), Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`) = clientIn.expectNext() - Await.result(chunkStream2.grouped(1000).runWith(Sink.head), 100.millis) shouldEqual chunks + Await.result(chunkStream2.limit(1000).runWith(Sink.seq), 100.millis) shouldEqual chunks clientOutSub.sendComplete() serverInSub.request(1) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/model/HttpEntitySpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/model/HttpEntitySpec.scala index dbd8b2beeb..d2961df1b3 100755 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/model/HttpEntitySpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/model/HttpEntitySpec.scala @@ -155,7 +155,7 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { def collectBytesTo(bytes: ByteString*): Matcher[HttpEntity] = equal(bytes.toVector).matcher[Seq[ByteString]].compose { entity ⇒ - val future = entity.dataBytes.grouped(1000).runWith(Sink.head) + val future = entity.dataBytes.limit(1000).runWith(Sink.seq) Await.result(future, 250.millis) } diff --git a/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/RouteTestResultComponent.scala b/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/RouteTestResultComponent.scala index 2c0f45327e..500b57fd2e 100644 --- a/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/RouteTestResultComponent.scala +++ b/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/RouteTestResultComponent.scala @@ -95,6 +95,6 @@ trait RouteTestResultComponent { failTest("Request was neither completed nor rejected within " + timeout) private def awaitAllElements[T](data: Source[T, _]): immutable.Seq[T] = - data.grouped(100000).runWith(Sink.headOption).awaitResult(timeout).getOrElse(Nil) + data.limit(100000).runWith(Sink.seq).awaitResult(timeout) } } \ No newline at end of file diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/coding/CoderSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/coding/CoderSpec.scala index e731ebbe26..e9ee051203 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/coding/CoderSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/coding/CoderSpec.scala @@ -120,7 +120,7 @@ abstract class CoderSpec extends WordSpec with CodecSpecSupport with Inspectors val resultBs = Source.single(compressed) .via(Coder.withMaxBytesPerChunk(limit).decoderFlow) - .grouped(4200).runWith(Sink.head) + .limit(4200).runWith(Sink.seq) .awaitResult(1.second) forAll(resultBs) { bs ⇒ diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/RangeDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/RangeDirectivesSpec.scala index 572970e7b0..963b707a5c 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/RangeDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/RangeDirectivesSpec.scala @@ -99,7 +99,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside { wrs { complete("Some random and not super short entity.") } } ~> check { header[`Content-Range`] should be(None) - val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head), 1.second) + val parts = Await.result(responseAs[Multipart.ByteRanges].parts.limit(1000).runWith(Sink.seq), 1.second) parts.size shouldEqual 2 inside(parts(0)) { case Multipart.ByteRanges.BodyPart(range, entity, unit, headers) ⇒ @@ -124,7 +124,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside { wrs { complete(HttpEntity.Default(ContentTypes.`text/plain(UTF-8)`, content.length, entityData())) } } ~> check { header[`Content-Range`] should be(None) - val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head), 1.second) + val parts = Await.result(responseAs[Multipart.ByteRanges].parts.limit(1000).runWith(Sink.seq), 1.second) parts.size shouldEqual 2 } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java index 718dcd75b6..a8e14b32a8 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java @@ -142,7 +142,8 @@ public class BidiFlowTest extends StreamTest { return ByteString.fromString("Hello " + arg); } })); - final CompletionStage> result = Source.from(list).via(f).grouped(10).runWith(Sink.> head(), materializer); + + final CompletionStage> result = Source.from(list).via(f).limit(10).runWith(Sink.seq(), materializer); assertEquals(Arrays.asList("Hello 3", "Hello 4", "Hello 5"), result.toCompletableFuture().get(1, TimeUnit.SECONDS)); } @@ -155,7 +156,7 @@ public class BidiFlowTest extends StreamTest { } }).join(bidi); final List inputs = Arrays.asList(ByteString.fromString("1"), ByteString.fromString("2")); - final CompletionStage> result = Source.from(inputs).via(f).grouped(10).runWith(Sink.> head(), materializer); + final CompletionStage> result = Source.from(inputs).via(f).limit(10).runWith(Sink.seq(), materializer); assertEquals(Arrays.asList(3L, 4L), result.toCompletableFuture().get(1, TimeUnit.SECONDS)); } @@ -167,7 +168,7 @@ public class BidiFlowTest extends StreamTest { return arg.toString(); } })); - final CompletionStage> result = Source.from(list).via(f).grouped(10).runWith(Sink.> head(), materializer); + final CompletionStage> result = Source.from(list).via(f).limit(10).runWith(Sink.seq(), materializer); assertEquals(Arrays.asList("5", "6", "7"), result.toCompletableFuture().get(1, TimeUnit.SECONDS)); } @@ -179,7 +180,7 @@ public class BidiFlowTest extends StreamTest { return arg.toString(); } }).join(inverse.reversed()).join(bidi.reversed()); - final CompletionStage> result = Source.from(list).via(f).grouped(10).runWith(Sink.> head(), materializer); + final CompletionStage> result = Source.from(list).via(f).limit(10).runWith(Sink.seq(), materializer); assertEquals(Arrays.asList("5", "6", "7"), result.toCompletableFuture().get(1, TimeUnit.SECONDS)); } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index 898b3877e4..b10b5de63d 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -84,7 +84,7 @@ public class FlowGraphTest extends StreamTest { // collecting final Publisher pub = source.runWith(publisher, materializer); - final CompletionStage> all = Source.fromPublisher(pub).grouped(100).runWith(Sink.>head(), materializer); + final CompletionStage> all = Source.fromPublisher(pub).limit(100).runWith(Sink.seq(), materializer); final List result = all.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 83bd3acbb0..db15b8a1a4 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -260,7 +260,7 @@ public class FlowTest extends StreamTest { .mergeSubstreams(); final CompletionStage>> future = - Source.from(input).via(flow).grouped(10).runWith(Sink.>> head(), materializer); + Source.from(input).via(flow).limit(10).runWith(Sink.> seq(), materializer); final Object[] result = future.toCompletableFuture().get(1, TimeUnit.SECONDS).toArray(); Arrays.sort(result, (Comparator)(Object) new Comparator>() { @Override @@ -286,7 +286,7 @@ public class FlowTest extends StreamTest { .concatSubstreams(); final CompletionStage>> future = - Source.from(input).via(flow).grouped(10).runWith(Sink.>> head(), materializer); + Source.from(input).via(flow).limit(10).runWith(Sink.> seq(), materializer); final List> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS); assertEquals(Arrays.asList(Arrays.asList("A", "B", "C"), Arrays.asList(".", "D"), Arrays.asList(".", "E", "F")), result); @@ -306,7 +306,7 @@ public class FlowTest extends StreamTest { .concatSubstreams(); final CompletionStage>> future = - Source.from(input).via(flow).grouped(10).runWith(Sink.>> head(), materializer); + Source.from(input).via(flow).limit(10).runWith(Sink.> seq(), materializer); final List> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS); assertEquals(Arrays.asList(Arrays.asList("A", "B", "C", "."), Arrays.asList("D", "."), Arrays.asList("E", "F")), result); @@ -359,7 +359,7 @@ public class FlowTest extends StreamTest { // collecting final Publisher pub = source.runWith(publisher, materializer); - final CompletionStage> all = Source.fromPublisher(pub).grouped(100).runWith(Sink.>head(), materializer); + final CompletionStage> all = Source.fromPublisher(pub).limit(100).runWith(Sink.seq(), materializer); final List result = all.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); @@ -445,7 +445,7 @@ public class FlowTest extends StreamTest { Pair, Source> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(Arrays.asList(1, 2, 3), result.first()); - CompletionStage> tailFuture = result.second().grouped(4).runWith(Sink.>head(), materializer); + CompletionStage> tailFuture = result.second().limit(4).runWith(Sink.seq(), materializer); List tailResult = tailFuture.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(Arrays.asList(4, 5, 6), tailResult); } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index d46518c614..06a553f180 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -321,7 +321,7 @@ public class SourceTest extends StreamTest { Pair, Source> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(Arrays.asList(1, 2, 3), result.first()); - CompletionStage> tailFuture = result.second().grouped(4).runWith(Sink.>head(), materializer); + CompletionStage> tailFuture = result.second().limit(4).runWith(Sink.seq(), materializer); List tailResult = tailFuture.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(Arrays.asList(4, 5, 6), tailResult); } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala index 4dccaea418..57e66385c8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala @@ -86,12 +86,12 @@ class FramingSpec extends AkkaSpec { "Respect maximum line settings" in { // The buffer will contain more than 1 bytes, but the individual frames are less Await.result( - Source.single(ByteString("a\nb\nc\nd\n")).via(simpleLines("\n", 1)).grouped(100).runWith(Sink.head), + Source.single(ByteString("a\nb\nc\nd\n")).via(simpleLines("\n", 1)).limit(100).runWith(Sink.seq), 3.seconds) should ===(List("a", "b", "c", "d")) an[FramingException] should be thrownBy { Await.result( - Source.single(ByteString("ab\n")).via(simpleLines("\n", 1)).grouped(100).runWith(Sink.head), + Source.single(ByteString("ab\n")).via(simpleLines("\n", 1)).limit(100).runWith(Sink.seq), 3.seconds) } } @@ -225,7 +225,7 @@ class FramingSpec extends AkkaSpec { val testMessages = List.fill(100)(referenceChunk.take(Random.nextInt(1024))) Await.result( - Source(testMessages).via(codecFlow).grouped(1000).runWith(Sink.head), + Source(testMessages).via(codecFlow).limit(1000).runWith(Sink.seq), 3.seconds) should ===(testMessages) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala index 2a472d7c11..f7b179a831 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala @@ -57,26 +57,26 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "work as a Flow that is open on the left" in { val f = bidi.join(Flow[Long].map(x ⇒ ByteString(s"Hello $x"))) - val result = Source(List(1, 2, 3)).via(f).grouped(10).runWith(Sink.head) + val result = Source(List(1, 2, 3)).via(f).limit(10).runWith(Sink.seq) Await.result(result, 1.second) should ===(Seq("Hello 3", "Hello 4", "Hello 5")) } "work as a Flow that is open on the right" in { val f = Flow[String].map(Integer.valueOf(_).toInt).join(bidi) - val result = Source(List(ByteString("1"), ByteString("2"))).via(f).grouped(10).runWith(Sink.head) + val result = Source(List(ByteString("1"), ByteString("2"))).via(f).limit(10).runWith(Sink.seq) Await.result(result, 1.second) should ===(Seq(3L, 4L)) } "work when atop its inverse" in { val f = bidi.atop(inverse).join(Flow[Int].map(_.toString)) - val result = Source(List(1, 2, 3)).via(f).grouped(10).runWith(Sink.head) + val result = Source(List(1, 2, 3)).via(f).limit(10).runWith(Sink.seq) Await.result(result, 1.second) should ===(Seq("5", "6", "7")) } "work when reversed" in { // just reversed from the case above; observe that Flow inverts itself automatically by being on the left side val f = Flow[Int].map(_.toString).join(inverse.reversed).join(bidi.reversed) - val result = Source(List(1, 2, 3)).via(f).grouped(10).runWith(Sink.head) + val result = Source(List(1, 2, 3)).via(f).limit(10).runWith(Sink.seq) Await.result(result, 1.second) should ===(Seq("5", "6", "7")) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala index 2d6bda805b..f5e9c2bdaf 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala @@ -42,7 +42,7 @@ class FlowScanSpec extends AkkaSpec { } "emit values promptly" in { - val f = Source.single(1).concat(Source.maybe[Int]).scan(0)(_ + _).grouped(2).runWith(Sink.head) + val f = Source.single(1).concat(Source.maybe[Int]).scan(0)(_ + _).take(2).runWith(Sink.seq) Await.result(f, 1.second) should ===(Seq(0, 1)) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 343bcae9fb..1cc6555e93 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -314,12 +314,12 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val identity1 = Flow[Int].toProcessor val identity2 = Flow.fromProcessor(() ⇒ identity1.run()) Await.result( - Source(1 to 10).via(identity2).grouped(100).runWith(Sink.head), + Source(1 to 10).via(identity2).limit(100).runWith(Sink.seq), 3.seconds) should ===(1 to 10) // Reusable: Await.result( - Source(1 to 10).via(identity2).grouped(100).runWith(Sink.head), + Source(1 to 10).via(identity2).limit(100).runWith(Sink.seq), 3.seconds) should ===(1 to 10) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala index b826a2d0d3..d981ee791f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala @@ -25,7 +25,7 @@ class FlowSupervisionSpec extends AkkaSpec { val failingMap = Flow[Int].map(n ⇒ if (n == 3) throw exc else n) def run(f: Flow[Int, Int, NotUsed]): immutable.Seq[Int] = - Await.result(Source((1 to 5).toSeq ++ (1 to 5)).via(f).grouped(1000).runWith(Sink.head), 3.seconds) + Await.result(Source((1 to 5).toSeq ++ (1 to 5)).via(f).limit(1000).runWith(Sink.seq), 3.seconds) "Stream supervision" must { @@ -47,7 +47,7 @@ class FlowSupervisionSpec extends AkkaSpec { "complete stream with NPE failure when null is emitted" in { intercept[NullPointerException] { - Await.result(Source(List("a", "b")).map(_ ⇒ null).grouped(1000).runWith(Sink.head), 3.seconds) + Await.result(Source(List("a", "b")).map(_ ⇒ null).limit(1000).runWith(Sink.seq), 3.seconds) }.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) } @@ -55,7 +55,7 @@ class FlowSupervisionSpec extends AkkaSpec { val nullMap = Flow[String].map(elem ⇒ if (elem == "b") null else elem) .withAttributes(supervisionStrategy(Supervision.resumingDecider)) val result = Await.result(Source(List("a", "b", "c")).via(nullMap) - .grouped(1000).runWith(Sink.head), 3.seconds) + .limit(1000).runWith(Sink.seq), 3.seconds) result should be(List("a", "c")) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala index bc01f061af..fa4a64913a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala @@ -14,6 +14,7 @@ import scala.concurrent.Await class PublisherSinkSpec extends AkkaSpec { implicit val materializer = ActorMaterializer() + import materializer.executionContext "A PublisherSink" must { @@ -42,7 +43,7 @@ class PublisherSinkSpec extends AkkaSpec { "work with SubscriberSource" in { val (sub, pub) = Source.asSubscriber[Int].toMat(Sink.asPublisher(false))(Keep.both).run() Source(1 to 100).to(Sink.fromSubscriber(sub)).run() - Await.result(Source.fromPublisher(pub).grouped(1000).runWith(Sink.head), 3.seconds) should ===(1 to 100) + Await.result(Source.fromPublisher(pub).limit(1000).runWith(Sink.seq), 3.seconds) should ===(1 to 100) } "be able to use Publisher in materialized value transformation" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala index 15461a56e1..45476cd47b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala @@ -11,7 +11,7 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals { implicit val materializer = ActorMaterializer() val source = Source(List(1, 2, 3)) - val sink = Flow[Int].grouped(10).toMat(Sink.head)(Keep.right) + val sink = Flow[Int].limit(10).toMat(Sink.seq)(Keep.right) "Reverse Arrows in the Graph DSL" must { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SeqSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SeqSinkSpec.scala index 622accd823..112bd3d72a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SeqSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SeqSinkSpec.scala @@ -5,7 +5,8 @@ package akka.stream.scaladsl import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit.AkkaSpec -import scala.concurrent.Await +import scala.collection.immutable +import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ class SeqSinkSpec extends AkkaSpec { @@ -18,16 +19,16 @@ class SeqSinkSpec extends AkkaSpec { "Sink.toSeq" must { "return a Seq[T] from a Source" in { val input = (1 to 6) - val future = Source(input).runWith(Sink.seq) - val result = Await.result(future, 300.millis) + val future: Future[immutable.Seq[Int]] = Source(input).runWith(Sink.seq) + val result: immutable.Seq[Int] = Await.result(future, 300.millis) result should be(input.toSeq) } "return an empty Seq[T] from an empty Source" in { - val input: Seq[Int] = Seq.empty - val future = Source.fromIterator(() ⇒ input.iterator).runWith(Sink.seq) - val result = Await.result(future, 300.millis) - result should be(Seq.empty: Seq[Int]) + val input: immutable.Seq[Int] = Nil + val future: Future[immutable.Seq[Int]] = Source.fromIterator(() ⇒ input.iterator).runWith(Sink.seq) + val result: immutable.Seq[Int] = Await.result(future, 300.millis) + result should be(input) } } } diff --git a/akka-stream/src/main/scala/akka/stream/StreamLimitReachedException.scala b/akka-stream/src/main/scala/akka/stream/StreamLimitReachedException.scala index 6eb8930258..fbc644ea43 100644 --- a/akka-stream/src/main/scala/akka/stream/StreamLimitReachedException.scala +++ b/akka-stream/src/main/scala/akka/stream/StreamLimitReachedException.scala @@ -1,6 +1,7 @@ /** * Copyright (C) 2015 Typesafe Inc. */ + package akka.stream class StreamLimitReachedException(val n: Long) extends RuntimeException(s"limit of $n reached") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 5676a81376..5920c29b4f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -8,10 +8,12 @@ import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Props } import akka.stream.Attributes.InputBuffer import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module import akka.stream.stage._ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance +import scala.collection.immutable import scala.concurrent.{ Future, Promise } import scala.language.postfixOps import scala.util.{ Failure, Success, Try } @@ -242,6 +244,44 @@ private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedV override def toString: String = "HeadOptionStage" } +private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] { + val in = Inlet[T]("seq.in") + + override val shape: SinkShape[T] = SinkShape.of(in) + + override protected def initialAttributes: Attributes = DefaultAttributes.seqSink + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + val p: Promise[immutable.Seq[T]] = Promise() + val logic = new GraphStageLogic(shape) { + val buf = Vector.newBuilder[T] + + override def preStart(): Unit = pull(in) + + setHandler(in, new InHandler { + + override def onPush(): Unit = { + buf += grab(in) + pull(in) + } + + override def onUpstreamFinish(): Unit = { + val result = buf.result() + p.trySuccess(result) + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + p.tryFailure(ex) + failStage(ex) + } + }) + } + + (logic, p.future) + } +} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 8726d633e2..56fa6817e4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -102,6 +102,7 @@ private[stream] object Stages { val headOptionSink = name("headOptionSink") and inputBuffer(initial = 1, max = 1) val lastSink = name("lastSink") val lastOptionSink = name("lastOptionSink") + val seqSink = name("seqSink") val publisherSink = name("publisherSink") val fanoutPublisherSink = name("fanoutPublisherSink") val ignoreSink = name("ignoreSink") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 5350c4b328..187bcc1abf 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -461,6 +461,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Completes when''' the defined number of elements has been taken or upstream completes * + * '''Errors when''' the total number of incoming element exceeds max + * * '''Cancels when''' the defined number of elements has been taken or downstream cancels * * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] @@ -486,6 +488,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Completes when''' the defined number of elements has been taken or upstream completes * + * '''Errors when''' when the accumulated cost exceeds max + * * '''Cancels when''' the defined number of elements has been taken or downstream cancels * * See also [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 802ca4207c..2657d267d0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -656,16 +656,15 @@ trait FlowOps[+Out, +Mat] { * requested from upstream publishers that will then not be processed downstream * of this step. * - * The stream will be completed without producing any elements if `n` is zero - * or negative. - * - * '''Emits when''' the specified number of elements to take has not yet been reached + * '''Emits when''' upstream emits and the number of emitted elements has not reached max * * '''Backpressures when''' downstream backpressures * - * '''Completes when''' the defined number of elements has been taken or upstream completes + * '''Completes when''' upstream completes and the number of emitted elements has not reached max * - * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * '''Errors when''' the total number of incoming element exceeds max + * + * '''Cancels when''' downstream cancels * * See also [[FlowOps.take]], [[FlowOps.takeWithin]], [[FlowOps.takeWhile]] */ @@ -681,16 +680,15 @@ trait FlowOps[+Out, +Mat] { * requested from upstream publishers that will then not be processed downstream * of this step. * - * The stream will be completed without producing any elements if `n` is zero - * or negative. - * - * '''Emits when''' the specified number of elements to take has not yet been reached + * '''Emits when''' upstream emits and the accumulated cost has not reached max * * '''Backpressures when''' downstream backpressures * - * '''Completes when''' the defined number of elements has been taken or upstream completes + * '''Completes when''' upstream completes and the number of emitted elements has not reached max * - * '''Cancels when''' the defined number of elements has been taken or downstream cancels + * '''Errors when''' when the accumulated cost exceeds max + * + * '''Cancels when''' downstream cancels * * See also [[FlowOps.take]], [[FlowOps.takeWithin]], [[FlowOps.takeWhile]] */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 691a4e4469..2f20a67b1a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -17,6 +17,7 @@ import akka.stream.{ javadsl, _ } import akka.util.ByteString import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec +import scala.collection.immutable import scala.concurrent.duration.{ FiniteDuration, _ } import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success, Try } @@ -161,11 +162,7 @@ object Sink { * * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] */ - def seq[T]: Sink[T, Future[Seq[T]]] = { - Flow[T].grouped(Integer.MAX_VALUE).toMat(Sink.headOption)(Keep.right) mapMaterializedValue { e ⇒ - e.map(_.getOrElse(Seq.empty[T]))(ExecutionContexts.sameThreadExecutionContext) - } - } + def seq[T]: Sink[T, Future[immutable.Seq[T]]] = Sink.fromGraph(new SeqStage[T]) /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]].