diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index d0902cb2ab..f9401cbffc 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -47,6 +47,17 @@ Stream a single object repeatedly **completes** never +cycle +^^^^^ +Stream iterator in cycled manner. Internally new iterator is being created to cycle the one provided via argument meaning +when original iterator runs out of elements process will start all over again from the beginning of the iterator +provided by the evaluation of provided parameter. If method argument provides empty iterator stream will be terminated with +exception. + +**emits** the next value returned from cycled iterator + +**completes** never + tick ^^^^ A periodical repetition of an arbitrary object. Delay of first tick is specified diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index 474ff2dcd6..4380e5c724 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -46,6 +46,17 @@ Stream a single object repeatedly **completes** never +cycle +^^^^^ +Stream iterator in cycled manner. Internally new iterator is being created to cycle the one provided via argument meaning +when original iterator runs out of elements process will start all over again from the beginning of the iterator +provided by the evaluation of provided parameter. If method argument provides empty iterator stream will be terminated with +exception. + +**emits** the next value returned from cycled iterator + +**completes** never + tick ^^^^ A periodical repetition of an arbitrary object. Delay of first tick is specified diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 2088d8e133..88ecfe5e22 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -272,6 +272,19 @@ class SourceSpec extends AkkaSpec with DefaultTimeout { } } + "Cycle Source" must { + + "continuously generate the same sequence" in { + val expected = Seq(1, 2, 3, 1, 2, 3, 1, 2, 3) + Source.cycle(() ⇒ List(1, 2, 3).iterator).grouped(9).runWith(Sink.head).futureValue should ===(expected) + } + + "throw an exception in case of empty iterator" in { + val empty = Iterator.empty + assert(Source.cycle(() ⇒ empty).runWith(Sink.head).failed.futureValue.isInstanceOf[IllegalArgumentException]) + } + } + "A Source" must { "suitably override attribute handling methods" in { import Attributes._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index a171ad7161..1a39051a6b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -80,6 +80,7 @@ private[stream] object Stages { val publisherSource = name("publisherSource") val iterableSource = name("iterableSource") + val cycledSource = name("cycledSource") val futureSource = name("futureSource") val tickSource = name("tickSource") val singleSource = name("singleSource") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index c929f46095..490765af7a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -90,6 +90,20 @@ object Source { def fromIterator[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] = new Source(scaladsl.Source.fromIterator(() ⇒ f.create().asScala)) + /** + * Helper to create 'cycled' [[Source]] from iterator provider. + * Example usage: + * + * {{{ + * Source.cycle(() -> Arrays.asList(1, 2, 3).iterator()); + * }}} + * + * Start a new 'cycled' `Source` from the given elements. The producer stream of elements + * will continue infinitely by repeating the sequence of elements provided by function parameter. + */ + def cycle[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] = + new Source(scaladsl.Source.cycle(() ⇒ f.create().asScala)) + /** * Helper to create [[Source]] from `Iterable`. * Example usage: diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 7740e27fd6..7326d8203b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -203,6 +203,17 @@ object Source { override def toString: String = "() => Iterator" }) + /** + * Create [[Source]] that will continually produce given elements in specified order. + * + * Start a new 'cycled' `Source` from the given elements. The producer stream of elements + * will continue infinitely by repeating the sequence of elements provided by function parameter. + */ + def cycle[T](f: () ⇒ Iterator[T]): Source[T, NotUsed] = { + val iterator = Iterator.continually { val i = f(); if (i.isEmpty) throw new IllegalArgumentException("empty iterator") else i }.flatten + fromIterator(() ⇒ iterator).withAttributes(DefaultAttributes.cycledSource) + } + /** * A graph with the shape of a source logically is a source, this method makes * it so also in type.