diff --git a/akka-docs/src/main/paradox/stream/operators/Source/cycle.md b/akka-docs/src/main/paradox/stream/operators/Source/cycle.md index 665274cc61..fb3179aa0e 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/cycle.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/cycle.md @@ -28,3 +28,20 @@ terminated with an exception. @@@ + +## Examples + +Scala +: @@snip [cycle.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala) { #cycle } + +Java +: @@snip [cycle.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java) { #cycle } + + +When iterator is empty the stream will be terminated with _IllegalArgumentException_ + +Scala +: @@snip [cycleError.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala) { #cycle-error } + +Java +: @@snip [cycle.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java) { #cycle-error } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 9347a7c966..20a94400d3 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -40,6 +40,7 @@ import java.util.stream.Stream; import static akka.NotUsed.notUsed; import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription; import static akka.stream.testkit.TestPublisher.ManualProbe; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.*; @SuppressWarnings("serial") @@ -769,6 +770,32 @@ public class SourceTest extends StreamTest { future.toCompletableFuture().get(3, TimeUnit.SECONDS); } + @Test + public void cycleSourceMustGenerateSameSequenceInRepeatedFashion() throws Exception { + //#cycle + final Source source = Source.cycle(() -> Arrays.asList(1, 2, 3).iterator()); + CompletionStage> result = source.grouped(9).runWith(Sink.head(), materializer); + List emittedValues = result.toCompletableFuture().get(); + assertThat(emittedValues, is(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3))); + //#cycle + } + + @Test(expected = IllegalArgumentException.class) + public void cycleSourceMustThr() throws Throwable { + + try { + //#cycle-error + Iterator emptyIterator = Collections.emptyList().iterator(); + Source.cycle(() -> emptyIterator) + .runWith(Sink.head(), materializer) + // stream will be terminated with IllegalArgumentException + //#cycle-error + .toCompletableFuture().get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + @Test public void mustBeAbleToUseMerge() throws Exception { final TestKit probe = new TestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index efbdc6ab63..d20e114090 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -330,12 +330,23 @@ class SourceSpec extends StreamSpec with DefaultTimeout { "continuously generate the same sequence" in { val expected = Seq(1, 2, 3, 1, 2, 3, 1, 2, 3) - Source.cycle(() ⇒ List(1, 2, 3).iterator).grouped(9).runWith(Sink.head).futureValue should ===(expected) + //#cycle + Source.cycle(() ⇒ List(1, 2, 3).iterator) + .grouped(9) + .runWith(Sink.head) + // This will produce the Seq(1, 2, 3, 1, 2, 3, 1, 2, 3) + //#cycle + .futureValue should ===(expected) } "throw an exception in case of empty iterator" in { + //#cycle-error val empty = Iterator.empty - assert(Source.cycle(() ⇒ empty).runWith(Sink.head).failed.futureValue.isInstanceOf[IllegalArgumentException]) + Source.cycle(() ⇒ empty) + .runWith(Sink.head) + // This will return a failed future with an `IllegalArgumentException` + //#cycle-error + .failed.futureValue shouldBe an[IllegalArgumentException] } }