diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index fcd009fe61..3f7164b580 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -483,6 +483,28 @@ public class SourceTest extends StreamTest { assertEquals("A", result); } + @Test + public void mustWorkFromRange() throws Exception { + Future> f = Source.range(0, 10).grouped(20).runWith(Sink.> head(), materializer); + final List result = Await.result(f, FiniteDuration.create(3, TimeUnit.SECONDS)); + assertEquals(11, result.size()); + Integer counter = 0; + for (Integer i: result) + assertEquals(i, counter++); + } + + @Test + public void mustWorkFromRangeWithStep() throws Exception { + Future> f = Source.range(0, 10, 2).grouped(20).runWith(Sink.> head(), materializer); + final List result = Await.result(f, FiniteDuration.create(3, TimeUnit.SECONDS)); + assertEquals(6, result.size()); + Integer counter = 0; + for (Integer i: result) { + assertEquals(i, counter); + counter+=2; + } + } + @Test public void mustRepeat() throws Exception { final Future> f = Source.repeat(42).grouped(10000).runWith(Sink.> head(), materializer); diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 3f72020851..b53db067c8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -4,6 +4,7 @@ package akka.stream.javadsl import java.io.{ OutputStream, InputStream, File } +import java.util import akka.actor.{ ActorRef, Cancellable, Props } import akka.event.LoggingAdapter @@ -19,6 +20,7 @@ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance import scala.collection.JavaConverters._ import scala.collection.immutable +import scala.collection.immutable.Range.Inclusive import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } import scala.language.{ higherKinds, implicitConversions } @@ -112,6 +114,32 @@ object Source { new Source(scaladsl.Source(scalaIterable)) } + /** + * Creates [[Source]] that represents integer values in range ''[start;end]'', step equals to 1. + * It allows to create `Source` out of range as simply as on Scala `Source(1 to N)` + * + * Uses [[scala.collection.immutable.Range.inclusive(Int, Int)]] internally + * + * @see [[scala.collection.immutable.Range.inclusive(Int, Int)]] + */ + def range(start: Int, end: Int): javadsl.Source[Integer, Unit] = range(start, end, 1) + + /** + * Creates [[Source]] that represents integer values in range ''[start;end]'', with the given step. + * It allows to create `Source` out of range as simply as on Scala `Source(1 to N)` + * + * Uses [[scala.collection.immutable.Range.inclusive(Int, Int, Int)]] internally + * + * @see [[scala.collection.immutable.Range.inclusive(Int, Int, Int)]] + */ + def range(start: Int, end: Int, step: Int): javadsl.Source[Integer, Unit] = + fromIterator[Integer](new function.Creator[util.Iterator[Integer]]() { + def create(): util.Iterator[Integer] = + new Inclusive(start, end, step) { + override def toString: String = s"Range($start to $end, step = $step)" + }.iterator.asJava.asInstanceOf[util.Iterator[Integer]] + }) + /** * Start a new `Source` from the given `Future`. The stream will consist of * one element when the `Future` is completed with a successful value, which