Add code sample for operator Source.single (#25375)

* Add code example for Source.single
This commit is contained in:
iyogi 2018-07-20 08:15:13 +02:00 committed by Christopher Batey
parent 61790d763d
commit 3e698be9ce
3 changed files with 43 additions and 1 deletions

View file

@ -25,3 +25,12 @@ Stream a single object
@@@ @@@
## Examples
Scala
: @@snip [source.scala]($akka$/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala) { #imports #source-single }
Java
: @@snip [source.java]($akka$/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java) { #imports #source-single }

View file

@ -12,7 +12,10 @@ import akka.actor.Status;
import akka.japi.Pair; import akka.japi.Pair;
import akka.japi.function.*; import akka.japi.function.*;
import akka.japi.pf.PFBuilder; import akka.japi.pf.PFBuilder;
//#imports
import akka.stream.*; import akka.stream.*;
//#imports
import akka.stream.scaladsl.FlowSpec; import akka.stream.scaladsl.FlowSpec;
import akka.util.ConstantFun; import akka.util.ConstantFun;
import akka.stream.stage.*; import akka.stream.stage.*;
@ -328,6 +331,22 @@ public class SourceTest extends StreamTest {
assertEquals("A", result); assertEquals("A", result);
} }
@Test
public void mustBeAbleToUseSingle() throws Exception {
//#source-single
CompletionStage<List<String>> future = Source.single("A").runWith(Sink.seq(), materializer);
CompletableFuture<List<String>> completableFuture = future.toCompletableFuture();
completableFuture.thenAccept(result -> System.out.printf("collected elements: %s\n", result));
// result list will contain exactly one element "A"
//#source-single
// DO NOT use get() directly in your production code!
List<String> result = completableFuture.get();
assertEquals(1, result.size());
assertEquals("A", result.get(0));
}
@Test @Test
public void mustBeAbleToUsePrefixAndTail() throws Exception { public void mustBeAbleToUsePrefixAndTail() throws Exception {
final TestKit probe = new TestKit(system); final TestKit probe = new TestKit(system);

View file

@ -5,7 +5,8 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import org.scalatest.time.{ Span, Millis } import org.scalatest.time.{ Millis, Span }
import scala.concurrent.{ Await, Future } import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._ import scala.concurrent.duration._
//#imports //#imports
@ -27,6 +28,19 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
implicit val config = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis)) implicit val config = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis))
"Single Source" must { "Single Source" must {
"produce exactly one element" in {
implicit val ec = system.dispatcher
//#source-single
val s: Future[immutable.Seq[Int]] = Source.single(1).runWith(Sink.seq)
s.foreach(list println(s"Collected elements: $list")) // prints: Collected elements: List(1)
//#source-single
s.futureValue should ===(immutable.Seq(1))
}
"produce element" in { "produce element" in {
val p = Source.single(1).runWith(Sink.asPublisher(false)) val p = Source.single(1).runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()