diff --git a/akka-docs/src/main/paradox/stream/operators/Source/single.md b/akka-docs/src/main/paradox/stream/operators/Source/single.md index 92dfdad3bc..8541e26b6d 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/single.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/single.md @@ -25,3 +25,12 @@ Stream a single object @@@ +## Examples + +Scala +: @@snip [source.scala]($akka$/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala) { #imports #source-single } + +Java +: @@snip [source.java]($akka$/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java) { #imports #source-single } + + diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 6a33288fb4..c75e120bab 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -12,7 +12,10 @@ import akka.actor.Status; import akka.japi.Pair; import akka.japi.function.*; import akka.japi.pf.PFBuilder; +//#imports import akka.stream.*; + +//#imports import akka.stream.scaladsl.FlowSpec; import akka.util.ConstantFun; import akka.stream.stage.*; @@ -328,6 +331,22 @@ public class SourceTest extends StreamTest { assertEquals("A", result); } + @Test + public void mustBeAbleToUseSingle() throws Exception { + //#source-single + CompletionStage> future = Source.single("A").runWith(Sink.seq(), materializer); + CompletableFuture> completableFuture = future.toCompletableFuture(); + completableFuture.thenAccept(result -> System.out.printf("collected elements: %s\n", result)); + // result list will contain exactly one element "A" + + //#source-single + // DO NOT use get() directly in your production code! + List result = completableFuture.get(); + assertEquals(1, result.size()); + assertEquals("A", result.get(0)); + + } + @Test public void mustBeAbleToUsePrefixAndTail() throws Exception { final TestKit probe = new TestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index abda45820e..a46a06a214 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -5,7 +5,8 @@ package akka.stream.scaladsl import akka.testkit.DefaultTimeout -import org.scalatest.time.{ Span, Millis } +import org.scalatest.time.{ Millis, Span } + import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ //#imports @@ -27,6 +28,19 @@ class SourceSpec extends StreamSpec with DefaultTimeout { implicit val config = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis)) "Single Source" must { + + "produce exactly one element" in { + implicit val ec = system.dispatcher + //#source-single + val s: Future[immutable.Seq[Int]] = Source.single(1).runWith(Sink.seq) + s.foreach(list ⇒ println(s"Collected elements: $list")) // prints: Collected elements: List(1) + + //#source-single + + s.futureValue should ===(immutable.Seq(1)) + + } + "produce element" in { val p = Source.single(1).runWith(Sink.asPublisher(false)) val c = TestSubscriber.manualProbe[Int]()