diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/last.md b/akka-docs/src/main/paradox/stream/operators/Sink/last.md index 46c3b485b2..85fd75af19 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/last.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/last.md @@ -26,4 +26,10 @@ completes. If the stream completes with no elements the @scala[`Future`] @java[` @@@ +## Example +Scala +: @@snip [LastSinkSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala) { #last-operator-example } + +Java +: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #last-operator-example } \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/lastOption.md b/akka-docs/src/main/paradox/stream/operators/Sink/lastOption.md index 78fe557f31..d9eddd04a0 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/lastOption.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/lastOption.md @@ -27,3 +27,10 @@ completed with @scala[`None`] @java[an empty `Optional`]. @@@ +## Example + +Scala +: @@snip [LastSinkSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala) { #lastOption-operator-example } + +Java +: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #lastOption-operator-example } \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/takeLast.md b/akka-docs/src/main/paradox/stream/operators/Sink/takeLast.md index 0e992f1765..43887d3bf6 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/takeLast.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/takeLast.md @@ -28,4 +28,10 @@ If there is a failure signaled in the stream the @scala[`Future`] @java[`Complet @@@ +## Example +Scala +: @@snip [TakeLastSinkSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TakeLastSinkSpec.scala) { #takeLast-operator-example } + +Java +: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #takeLast-operator-example } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java index 759585d769..0a4dfc39df 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java @@ -6,30 +6,75 @@ package jdocs.stream.operators; import akka.NotUsed; import akka.actor.ActorSystem; + import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; - -import java.util.Arrays; +//#takeLast-operator-example +import akka.japi.Pair; +//#takeLast-operator-example +import java.util.*; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SinkDocExamples { + private final static ActorSystem system = ActorSystem.create("SourceFromExample"); + private final static Materializer materializer = ActorMaterializer.create(system); + static void reduceExample() throws InterruptedException, ExecutionException, TimeoutException { - final ActorSystem system = ActorSystem.create("SourceFromExample"); - final Materializer materializer = ActorMaterializer.create(system); //#reduce-operator-example Source ints = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); CompletionStage sum = ints.runWith(Sink.reduce((a, b) -> a + b), materializer); - int result = sum.toCompletableFuture().get(3, TimeUnit.SECONDS); - System.out.println(result); + sum.thenAccept(System.out::println); // 55 //#reduce-operator-example } -} + static void takeLastExample() throws InterruptedException, ExecutionException, TimeoutException { + //#takeLast-operator-example + // pair of (Name, GPA) + List sortedStudents = Arrays.asList(new Pair<>("Benita", 2.1), new Pair<>("Adrian", 3.1), + new Pair<>("Alexis", 4), new Pair<>("Kendra", 4.2), new Pair<>("Jerrie", 4.3), new Pair<>("Alison", 4.7)); + + Source studentSource = Source.from(sortedStudents); + + CompletionStage> topThree = studentSource.runWith(Sink.takeLast(3), materializer); + + topThree.thenAccept(result -> { + System.out.println("#### Top students ####"); + for (int i = result.size() - 1; i >= 0; i--) { + Pair s = result.get(i); + System.out.println("Name: " + s.first() + ", " + "GPA: " + s.second()); + } + }); + /* + #### Top students #### + Name: Alison, GPA: 4.7 + Name: Jerrie, GPA: 4.3 + Name: Kendra, GPA: 4.2 + */ + //#takeLast-operator-example + } + + static void lastExample() throws InterruptedException, ExecutionException, TimeoutException { + //#last-operator-example + Source source = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + CompletionStage result = source.runWith(Sink.last(), materializer); + result.thenAccept(System.out::println); + // 10 + //#last-operator-example + } + + static void lastOptionExample() throws InterruptedException, ExecutionException, TimeoutException { + //#lastOption-operator-example + Source source = Source.empty(); + CompletionStage> result = source.runWith(Sink.lastOption(), materializer); + result.thenAccept(System.out::println); + // Optional.empty + //#lastOption-operator-example + } +} \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala index fb85fd56d8..828ce75b7f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LastSinkSpec.scala @@ -4,25 +4,30 @@ package akka.stream.scaladsl -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit._ -import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.StreamTestKit._ +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future } + class LastSinkSpec extends StreamSpec with ScriptedTest { val settings = ActorMaterializerSettings(system) - implicit val materializer = ActorMaterializer(settings) + implicit val materializer: ActorMaterializer = ActorMaterializer(settings) + implicit val ec = system.dispatcher "A Flow with Sink.last" must { - "yield the last value" in assertAllStagesStopped { - Await.result(Source(1 to 42).map(identity).runWith(Sink.last), 1.second) should be(42) + "yield the last value" in { + //#last-operator-example + val source = Source(1 to 10) + val result: Future[Int] = source.runWith(Sink.last) + result.map(println) + // 10 + //#last-operator-example + result.futureValue shouldEqual 10 } "yield the first error" in assertAllStagesStopped { @@ -52,8 +57,14 @@ class LastSinkSpec extends StreamSpec with ScriptedTest { } should be theSameInstanceAs (ex) } - "yield None for empty stream" in assertAllStagesStopped { - Await.result(Source.empty[Int].runWith(Sink.lastOption), 1.second) should be(None) + "yield None for empty stream" in { + //#lastOption-operator-example + val source = Source.empty[Int] + val result: Future[Option[Int]] = source.runWith(Sink.lastOption) + result.map(println) + // None + //#lastOption-operator-example + result.futureValue shouldEqual None } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TakeLastSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TakeLastSinkSpec.scala index ab1c983390..baff0e11dc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TakeLastSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TakeLastSinkSpec.scala @@ -25,6 +25,36 @@ class TakeLastSinkSpec extends StreamSpec { result should be(Seq(4, 5, 6)) } + "return top three student based on GPA correctly" in { + implicit val ex = system.dispatcher + //#takeLast-operator-example + case class Student(name: String, gpa: Double) + + val students = List(Student("Alison", 4.7), Student("Adrian", 3.1), Student("Alexis", 4), + Student("Benita", 2.1), Student("Kendra", 4.2), Student("Jerrie", 4.3)).sortBy(_.gpa) + + val sourceOfStudents = Source(students) + + val result: Future[Seq[Student]] = sourceOfStudents.runWith(Sink.takeLast(3)) + + result.foreach { topThree ⇒ + println("#### Top students ####") + topThree.reverse foreach { s ⇒ + println(s"Name: ${s.name}, GPA: ${s.gpa}") + } + } + /* + #### Top students #### + Name: Alison, GPA: 4.7 + Name: Jerrie, GPA: 4.3 + Name: Kendra, GPA: 4.2 + */ + + //#takeLast-operator-example + + result.futureValue shouldEqual students.takeRight(3) + } + "return the number of elements taken when the stream completes" in { val input = 1 to 4 val future: Future[immutable.Seq[Int]] = Source(input).runWith(Sink.takeLast(5))