25468: added examples for Stream # cycle operator. (#26163)
This commit is contained in:
parent
f3b7d316b2
commit
1cae9b0d44
3 changed files with 57 additions and 2 deletions
|
|
@ -28,3 +28,20 @@ terminated with an exception.
|
|||
|
||||
@@@
|
||||
|
||||
|
||||
## Examples
|
||||
|
||||
Scala
|
||||
: @@snip [cycle.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala) { #cycle }
|
||||
|
||||
Java
|
||||
: @@snip [cycle.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java) { #cycle }
|
||||
|
||||
|
||||
When iterator is empty the stream will be terminated with _IllegalArgumentException_
|
||||
|
||||
Scala
|
||||
: @@snip [cycleError.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala) { #cycle-error }
|
||||
|
||||
Java
|
||||
: @@snip [cycle.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java) { #cycle-error }
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ import java.util.stream.Stream;
|
|||
import static akka.NotUsed.notUsed;
|
||||
import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription;
|
||||
import static akka.stream.testkit.TestPublisher.ManualProbe;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
|
|
@ -769,6 +770,32 @@ public class SourceTest extends StreamTest {
|
|||
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cycleSourceMustGenerateSameSequenceInRepeatedFashion() throws Exception {
|
||||
//#cycle
|
||||
final Source<Integer, NotUsed> source = Source.cycle(() -> Arrays.asList(1, 2, 3).iterator());
|
||||
CompletionStage<List<Integer>> result = source.grouped(9).runWith(Sink.head(), materializer);
|
||||
List<Integer> emittedValues = result.toCompletableFuture().get();
|
||||
assertThat(emittedValues, is(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3)));
|
||||
//#cycle
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void cycleSourceMustThr() throws Throwable {
|
||||
|
||||
try {
|
||||
//#cycle-error
|
||||
Iterator<Integer> emptyIterator = Collections.<Integer>emptyList().iterator();
|
||||
Source.cycle(() -> emptyIterator)
|
||||
.runWith(Sink.head(), materializer)
|
||||
// stream will be terminated with IllegalArgumentException
|
||||
//#cycle-error
|
||||
.toCompletableFuture().get();
|
||||
} catch (ExecutionException e) {
|
||||
throw e.getCause();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseMerge() throws Exception {
|
||||
final TestKit probe = new TestKit(system);
|
||||
|
|
|
|||
|
|
@ -330,12 +330,23 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
|
|||
|
||||
"continuously generate the same sequence" in {
|
||||
val expected = Seq(1, 2, 3, 1, 2, 3, 1, 2, 3)
|
||||
Source.cycle(() ⇒ List(1, 2, 3).iterator).grouped(9).runWith(Sink.head).futureValue should ===(expected)
|
||||
//#cycle
|
||||
Source.cycle(() ⇒ List(1, 2, 3).iterator)
|
||||
.grouped(9)
|
||||
.runWith(Sink.head)
|
||||
// This will produce the Seq(1, 2, 3, 1, 2, 3, 1, 2, 3)
|
||||
//#cycle
|
||||
.futureValue should ===(expected)
|
||||
}
|
||||
|
||||
"throw an exception in case of empty iterator" in {
|
||||
//#cycle-error
|
||||
val empty = Iterator.empty
|
||||
assert(Source.cycle(() ⇒ empty).runWith(Sink.head).failed.futureValue.isInstanceOf[IllegalArgumentException])
|
||||
Source.cycle(() ⇒ empty)
|
||||
.runWith(Sink.head)
|
||||
// This will return a failed future with an `IllegalArgumentException`
|
||||
//#cycle-error
|
||||
.failed.futureValue shouldBe an[IllegalArgumentException]
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue