+str #18807 Source.range for Java DSL

This commit is contained in:
Alexander Golubev 2015-12-02 11:20:47 -05:00
parent 9597b00f70
commit 89d32f6f09
2 changed files with 33 additions and 10 deletions

View file

@ -467,12 +467,24 @@ public class SourceTest extends StreamTest {
public void mustWorkFromRange() throws Exception { public void mustWorkFromRange() throws Exception {
Future<List<Integer>> f = Source.range(0, 10).grouped(20).runWith(Sink.<List<Integer>> head(), materializer); Future<List<Integer>> f = Source.range(0, 10).grouped(20).runWith(Sink.<List<Integer>> head(), materializer);
final List<Integer> result = Await.result(f, FiniteDuration.create(3, TimeUnit.SECONDS)); final List<Integer> result = Await.result(f, FiniteDuration.create(3, TimeUnit.SECONDS));
assertEquals(result.size(), 11); assertEquals(11, result.size());
Integer counter = 0; Integer counter = 0;
for (Integer i: result) for (Integer i: result)
assertEquals(i, counter++); assertEquals(i, counter++);
} }
@Test
public void mustWorkFromRangeWithStep() throws Exception {
Future<List<Integer>> f = Source.range(0, 10, 2).grouped(20).runWith(Sink.<List<Integer>> head(), materializer);
final List<Integer> result = Await.result(f, FiniteDuration.create(3, TimeUnit.SECONDS));
assertEquals(6, result.size());
Integer counter = 0;
for (Integer i: result) {
assertEquals(i, counter);
counter+=2;
}
}
@Test @Test
public void mustRepeat() throws Exception { public void mustRepeat() throws Exception {
final Future<List<Integer>> f = Source.repeat(42).grouped(10000).runWith(Sink.<List<Integer>> head(), materializer); final Future<List<Integer>> f = Source.repeat(42).grouped(10000).runWith(Sink.<List<Integer>> head(), materializer);

View file

@ -112,17 +112,28 @@ object Source {
} }
/** /**
* Creates [[Source]] with `start` as the first element and each next element as `previous + 1` until * Creates [[Source]] that represents integer values in range ''[start;end]'', step equals to 1.
* it reaches `end`. It allows to create `Source` out of range as simply as on scala `Source(1 to N)` * It allows to create `Source` out of range as simply as on Scala `Source(1 to N)`
*
* Uses [[scala.collection.immutable.Range.inclusive(Int, Int)]] internally
*
* @see [[scala.collection.immutable.Range.inclusive(Int, Int)]]
*/ */
def range(start: Int, end: Int): javadsl.Source[Integer, Unit] = { def range(start: Int, end: Int): javadsl.Source[Integer, Unit] = range(start, end, 1)
require(start <= end, "start must be less or equal than end")
from(new util.AbstractList[Integer]() { /**
override def get(index: Int) = start + index * Creates [[Source]] that represents integer values in range ''[start;end]'', with the given step.
override def size = end - start + 1 * It allows to create `Source` out of range as simply as on Scala `Source(1 to N)`
override def toString = s"Range($start to $end)" *
* Uses [[scala.collection.immutable.Range.inclusive(Int, Int, Int)]] internally
*
* @see [[scala.collection.immutable.Range.inclusive(Int, Int, Int)]]
*/
def range(start: Int, end: Int, step: Int): javadsl.Source[Integer, Unit] =
fromIterator[Integer](new function.Creator[util.Iterator[Integer]]() {
def create(): util.Iterator[Integer] =
Range.inclusive(start, end, step).iterator.asJava.asInstanceOf[util.Iterator[Integer]]
}) })
}
/** /**
* Start a new `Source` from the given `Future`. The stream will consist of * Start a new `Source` from the given `Future`. The stream will consist of