diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index 9c55de5dea..4d51e0c946 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -47,6 +47,17 @@ Stream a single object repeatedly **completes** never +cycle +^^^^^ +Stream iterator in cycled manner. Internally new iterator is being created to cycle the one provided via argument meaning +when original iterator runs out of elements process will start all over again from the beginning of the iterator +provided by the evaluation of provided parameter. If method argument provides empty iterator stream will be terminated with +exception. + +**emits** the next value returned from cycled iterator + +**completes** never + tick ^^^^ A periodical repetition of an arbitrary object. Delay of first tick is specified diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index 27bd060301..b307e23ec3 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -46,6 +46,17 @@ Stream a single object repeatedly **completes** never +cycle +^^^^^ +Stream iterator in cycled manner. Internally new iterator is being created to cycle the one provided via argument meaning +when original iterator runs out of elements process will start all over again from the beginning of the iterator +provided by the evaluation of provided parameter. If method argument provides empty iterator stream will be terminated with +exception. + +**emits** the next value returned from cycled iterator + +**completes** never + tick ^^^^ A periodical repetition of an arbitrary object. Delay of first tick is specified diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index d04dc8642c..84118af688 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -302,6 +302,19 @@ class SourceSpec extends AkkaSpec with DefaultTimeout { } } + "Cycle Source" must { + + "continuously generate the same sequence" in { + val expected = Seq(1, 2, 3, 1, 2, 3, 1, 2, 3) + Source.cycle(() ⇒ List(1, 2, 3).iterator).grouped(9).runWith(Sink.head).futureValue should ===(expected) + } + + "throw an exception in case of empty iterator" in { + val empty = Iterator.empty + assert(Source.cycle(() ⇒ empty).runWith(Sink.head).failed.futureValue.isInstanceOf[IllegalArgumentException]) + } + } + "A Source" must { "suitably override attribute handling methods" in { import Attributes._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 60968ad03f..9c660d004c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -82,6 +82,7 @@ private[stream] object Stages { val publisherSource = name("publisherSource") val iterableSource = name("iterableSource") + val cycledSource = name("cycledSource") val futureSource = name("futureSource") val tickSource = name("tickSource") val singleSource = name("singleSource") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index d9e06c548f..9d2a650a61 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -88,6 +88,20 @@ object Source { def fromIterator[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] = new Source(scaladsl.Source.fromIterator(() ⇒ f.create().asScala)) + /** + * Helper to create 'cycled' [[Source]] from iterator provider. + * Example usage: + * + * {{{ + * Source.cycle(() -> Arrays.asList(1, 2, 3).iterator()); + * }}} + * + * Start a new 'cycled' `Source` from the given elements. The producer stream of elements + * will continue infinitely by repeating the sequence of elements provided by function parameter. + */ + def cycle[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] = + new Source(scaladsl.Source.cycle(() ⇒ f.create().asScala)) + /** * Helper to create [[Source]] from `Iterable`. * Example usage: diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 0b5beab5fc..5fb765495e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -208,6 +208,17 @@ object Source { override def toString: String = "() => Iterator" }) + /** + * Create [[Source]] that will continually produce given elements in specified order. + * + * Start a new 'cycled' `Source` from the given elements. The producer stream of elements + * will continue infinitely by repeating the sequence of elements provided by function parameter. + */ + def cycle[T](f: () ⇒ Iterator[T]): Source[T, NotUsed] = { + val iterator = Iterator.continually { val i = f(); if (i.isEmpty) throw new IllegalArgumentException("empty iterator") else i }.flatten + fromIterator(() ⇒ iterator).withAttributes(DefaultAttributes.cycledSource) + } + /** * A graph with the shape of a source logically is a source, this method makes * it so also in type.