Fixed 19039 for both scaladsl and javadsl

19039: Added cycle method to Source class for both scaladsl and javadsl. Test added.
This commit is contained in:
Oleksii Tkachuk 2016-03-22 21:59:52 -05:00
parent b3444fa1b0
commit 78e0a534f4
6 changed files with 61 additions and 0 deletions

View file

@ -47,6 +47,17 @@ Stream a single object repeatedly
**completes** never
cycle
^^^^^
Stream iterator in cycled manner. Internally new iterator is being created to cycle the one provided via argument meaning
when original iterator runs out of elements process will start all over again from the beginning of the iterator
provided by the evaluation of provided parameter. If method argument provides empty iterator stream will be terminated with
exception.
**emits** the next value returned from cycled iterator
**completes** never
tick
^^^^
A periodical repetition of an arbitrary object. Delay of first tick is specified

View file

@ -46,6 +46,17 @@ Stream a single object repeatedly
**completes** never
cycle
^^^^^
Stream iterator in cycled manner. Internally new iterator is being created to cycle the one provided via argument meaning
when original iterator runs out of elements process will start all over again from the beginning of the iterator
provided by the evaluation of provided parameter. If method argument provides empty iterator stream will be terminated with
exception.
**emits** the next value returned from cycled iterator
**completes** never
tick
^^^^
A periodical repetition of an arbitrary object. Delay of first tick is specified

View file

@ -272,6 +272,19 @@ class SourceSpec extends AkkaSpec with DefaultTimeout {
}
}
"Cycle Source" must {
"continuously generate the same sequence" in {
val expected = Seq(1, 2, 3, 1, 2, 3, 1, 2, 3)
Source.cycle(() List(1, 2, 3).iterator).grouped(9).runWith(Sink.head).futureValue should ===(expected)
}
"throw an exception in case of empty iterator" in {
val empty = Iterator.empty
assert(Source.cycle(() empty).runWith(Sink.head).failed.futureValue.isInstanceOf[IllegalArgumentException])
}
}
"A Source" must {
"suitably override attribute handling methods" in {
import Attributes._

View file

@ -80,6 +80,7 @@ private[stream] object Stages {
val publisherSource = name("publisherSource")
val iterableSource = name("iterableSource")
val cycledSource = name("cycledSource")
val futureSource = name("futureSource")
val tickSource = name("tickSource")
val singleSource = name("singleSource")

View file

@ -90,6 +90,20 @@ object Source {
def fromIterator[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] =
new Source(scaladsl.Source.fromIterator(() f.create().asScala))
/**
* Helper to create 'cycled' [[Source]] from iterator provider.
* Example usage:
*
* {{{
* Source.cycle(() -> Arrays.asList(1, 2, 3).iterator());
* }}}
*
* Start a new 'cycled' `Source` from the given elements. The producer stream of elements
* will continue infinitely by repeating the sequence of elements provided by function parameter.
*/
def cycle[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] =
new Source(scaladsl.Source.cycle(() f.create().asScala))
/**
* Helper to create [[Source]] from `Iterable`.
* Example usage:

View file

@ -203,6 +203,17 @@ object Source {
override def toString: String = "() => Iterator"
})
/**
* Create [[Source]] that will continually produce given elements in specified order.
*
* Start a new 'cycled' `Source` from the given elements. The producer stream of elements
* will continue infinitely by repeating the sequence of elements provided by function parameter.
*/
def cycle[T](f: () Iterator[T]): Source[T, NotUsed] = {
val iterator = Iterator.continually { val i = f(); if (i.isEmpty) throw new IllegalArgumentException("empty iterator") else i }.flatten
fromIterator(() iterator).withAttributes(DefaultAttributes.cycledSource)
}
/**
* A graph with the shape of a source logically is a source, this method makes
* it so also in type.