+str #18807 add Source.range for Java DSL

This commit is contained in:
Alexander Golubev 2015-11-27 22:24:24 -05:00
parent b4fc3c11d8
commit 18f1bc73fe
2 changed files with 23 additions and 0 deletions

View file

@ -463,6 +463,16 @@ public class SourceTest extends StreamTest {
assertEquals("A", result);
}
@Test
public void mustWorkFromRange() throws Exception {
Future<List<Integer>> f = Source.range(0, 10).grouped(20).runWith(Sink.<List<Integer>> head(), materializer);
final List<Integer> result = Await.result(f, FiniteDuration.create(3, TimeUnit.SECONDS));
assertEquals(result.size(), 11);
Integer counter = 0;
for (Integer i: result)
assertEquals(i, counter++);
}
@Test
public void mustRepeat() throws Exception {
final Future<List<Integer>> f = Source.repeat(42).grouped(10000).runWith(Sink.<List<Integer>> head(), materializer);

View file

@ -4,6 +4,7 @@
package akka.stream.javadsl
import java.io.{ OutputStream, InputStream, File }
import java.util
import akka.actor.{ ActorRef, Cancellable, Props }
import akka.event.LoggingAdapter
@ -110,6 +111,18 @@ object Source {
new Source(scaladsl.Source(scalaIterable))
}
/**
* Creates [[Source]] with `start` as the first element and each next element as `previous + 1` until
* it reaches `end`. It allows to create `Source` out of range as simply as on scala `Source(1 to N)`
*/
def range(start: Int, end: Int): javadsl.Source[Integer, Unit] = {
require(start <= end, "start must be less or equal than end")
from(new util.AbstractList[Integer]() {
override def get(index: Int) = start + index
override def size() = end - start + 1
})
}
/**
* Start a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which