Merge pull request #19038 from agolubev/agolubev-#18807-range-Source-for-Java-DSL
+str #18807 add Source.range for Java DSL
This commit is contained in:
commit
d5d099725a
2 changed files with 50 additions and 0 deletions
|
|
@ -483,6 +483,28 @@ public class SourceTest extends StreamTest {
|
||||||
assertEquals("A", result);
|
assertEquals("A", result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustWorkFromRange() throws Exception {
|
||||||
|
Future<List<Integer>> f = Source.range(0, 10).grouped(20).runWith(Sink.<List<Integer>> head(), materializer);
|
||||||
|
final List<Integer> result = Await.result(f, FiniteDuration.create(3, TimeUnit.SECONDS));
|
||||||
|
assertEquals(11, result.size());
|
||||||
|
Integer counter = 0;
|
||||||
|
for (Integer i: result)
|
||||||
|
assertEquals(i, counter++);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustWorkFromRangeWithStep() throws Exception {
|
||||||
|
Future<List<Integer>> f = Source.range(0, 10, 2).grouped(20).runWith(Sink.<List<Integer>> head(), materializer);
|
||||||
|
final List<Integer> result = Await.result(f, FiniteDuration.create(3, TimeUnit.SECONDS));
|
||||||
|
assertEquals(6, result.size());
|
||||||
|
Integer counter = 0;
|
||||||
|
for (Integer i: result) {
|
||||||
|
assertEquals(i, counter);
|
||||||
|
counter+=2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void mustRepeat() throws Exception {
|
public void mustRepeat() throws Exception {
|
||||||
final Future<List<Integer>> f = Source.repeat(42).grouped(10000).runWith(Sink.<List<Integer>> head(), materializer);
|
final Future<List<Integer>> f = Source.repeat(42).grouped(10000).runWith(Sink.<List<Integer>> head(), materializer);
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@
|
||||||
package akka.stream.javadsl
|
package akka.stream.javadsl
|
||||||
|
|
||||||
import java.io.{ OutputStream, InputStream, File }
|
import java.io.{ OutputStream, InputStream, File }
|
||||||
|
import java.util
|
||||||
|
|
||||||
import akka.actor.{ ActorRef, Cancellable, Props }
|
import akka.actor.{ ActorRef, Cancellable, Props }
|
||||||
import akka.event.LoggingAdapter
|
import akka.event.LoggingAdapter
|
||||||
|
|
@ -19,6 +20,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
|
||||||
import scala.annotation.unchecked.uncheckedVariance
|
import scala.annotation.unchecked.uncheckedVariance
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
|
import scala.collection.immutable.Range.Inclusive
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
import scala.concurrent.{ Future, Promise }
|
import scala.concurrent.{ Future, Promise }
|
||||||
import scala.language.{ higherKinds, implicitConversions }
|
import scala.language.{ higherKinds, implicitConversions }
|
||||||
|
|
@ -112,6 +114,32 @@ object Source {
|
||||||
new Source(scaladsl.Source(scalaIterable))
|
new Source(scaladsl.Source(scalaIterable))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates [[Source]] that represents integer values in range ''[start;end]'', step equals to 1.
|
||||||
|
* It allows to create `Source` out of range as simply as on Scala `Source(1 to N)`
|
||||||
|
*
|
||||||
|
* Uses [[scala.collection.immutable.Range.inclusive(Int, Int)]] internally
|
||||||
|
*
|
||||||
|
* @see [[scala.collection.immutable.Range.inclusive(Int, Int)]]
|
||||||
|
*/
|
||||||
|
def range(start: Int, end: Int): javadsl.Source[Integer, Unit] = range(start, end, 1)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates [[Source]] that represents integer values in range ''[start;end]'', with the given step.
|
||||||
|
* It allows to create `Source` out of range as simply as on Scala `Source(1 to N)`
|
||||||
|
*
|
||||||
|
* Uses [[scala.collection.immutable.Range.inclusive(Int, Int, Int)]] internally
|
||||||
|
*
|
||||||
|
* @see [[scala.collection.immutable.Range.inclusive(Int, Int, Int)]]
|
||||||
|
*/
|
||||||
|
def range(start: Int, end: Int, step: Int): javadsl.Source[Integer, Unit] =
|
||||||
|
fromIterator[Integer](new function.Creator[util.Iterator[Integer]]() {
|
||||||
|
def create(): util.Iterator[Integer] =
|
||||||
|
new Inclusive(start, end, step) {
|
||||||
|
override def toString: String = s"Range($start to $end, step = $step)"
|
||||||
|
}.iterator.asJava.asInstanceOf[util.Iterator[Integer]]
|
||||||
|
})
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start a new `Source` from the given `Future`. The stream will consist of
|
* Start a new `Source` from the given `Future`. The stream will consist of
|
||||||
* one element when the `Future` is completed with a successful value, which
|
* one element when the `Future` is completed with a successful value, which
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue