From 743cd98bf41778233b6a44060eea768d9cb83c84 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Thu, 26 Feb 2015 12:36:46 +0100 Subject: [PATCH] fix two wrong/flaky tests - ordering is not preserved by shufflers in GraphOpsIntegrationSpec - larger tolerance is needed in GraphBalanceSpec since balancing does not keep track of previous imbalances - also add Source.repeat(elem) --- .../java/akka/stream/javadsl/FlexiRouteTest.java | 1 + .../test/java/akka/stream/javadsl/SourceTest.java | 8 ++++++++ .../akka/stream/scaladsl/GraphBalanceSpec.scala | 8 ++++---- .../stream/scaladsl/GraphOpsIntegrationSpec.scala | 15 ++++----------- .../scala/akka/stream/scaladsl/SourceSpec.scala | 9 +++++++++ .../main/scala/akka/stream/javadsl/Source.scala | 6 ++++++ .../main/scala/akka/stream/scaladsl/Source.scala | 5 +++++ 7 files changed, 37 insertions(+), 15 deletions(-) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java index 9553ca03b1..85f78bf361 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java @@ -146,6 +146,7 @@ public class FlexiRouteTest { return new RouteLogic() { private State emitToAnyWithDemand = new State(demandFromAny(s.out(0), s.out(1))) { + @SuppressWarnings("unchecked") @Override public State onInput(RouteLogicContext ctx, OutPort out, T element) { ctx.emit((Outlet) out, element); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 3bf6cf035b..331dbf712b 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -451,4 +451,12 @@ public class SourceTest extends StreamTest { String result = Await.result(future2, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals("A", result); } + + @Test + public void mustRepeat() throws Exception { + final Future> f = Source.repeat(42).grouped(10000).runWith(Sink.> head(), materializer); + final List result = Await.result(f, FiniteDuration.create(3, TimeUnit.SECONDS)); + assertEquals(result.size(), 10000); + for (Integer i: result) assertEquals(i, (Integer) 42); + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala index 804bb77319..a91e6beab1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala @@ -130,15 +130,15 @@ class GraphBalanceSpec extends AkkaSpec { val (r1, r2, r3) = FlowGraph.closed(outputs, outputs, outputs)(Tuple3.apply) { implicit b ⇒ (o1, o2, o3) ⇒ val balance = b.add(Balance[Int](3, waitForAllDownstreams = true)) - Source(Stream.fill(numElementsForSink * 3)(1)) ~> balance.in + Source.repeat(1).take(numElementsForSink * 3) ~> balance.in balance.out(0) ~> o1.inlet balance.out(1) ~> o2.inlet balance.out(2) ~> o3.inlet }.run() - Await.result(r1, 3.seconds) should be(numElementsForSink +- 1000) - Await.result(r2, 3.seconds) should be(numElementsForSink +- 1000) - Await.result(r3, 3.seconds) should be(numElementsForSink +- 1000) + Await.result(r1, 3.seconds) should be(numElementsForSink +- 2000) + Await.result(r2, 3.seconds) should be(numElementsForSink +- 2000) + Await.result(r3, 3.seconds) should be(numElementsForSink +- 2000) } "produce to second even though first cancels" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala index 74dde24446..b4ad0649d2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala @@ -3,13 +3,13 @@ package akka.stream.scaladsl import scala.collection.immutable import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ - import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit.{ OnNext, SubscriberProbe } import akka.util.ByteString import akka.stream.{ Inlet, Outlet, Shape, Graph } +import org.scalautils.ConversionCheckedTripleEquals object GraphOpsIntegrationSpec { import FlowGraph.Implicits._ @@ -43,7 +43,7 @@ object GraphOpsIntegrationSpec { } -class GraphOpsIntegrationSpec extends AkkaSpec { +class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEquals { import akka.stream.scaladsl.GraphOpsIntegrationSpec._ import FlowGraph.Implicits._ @@ -185,19 +185,12 @@ class GraphOpsIntegrationSpec extends AkkaSpec { s3.out1 ~> merge.in(0) s3.out2 ~> merge.in(1) - merge.out.grouped(1000) ~> sink.inlet + merge.out.grouped(1000) ~> sink }.run() val result = Await.result(f, 3.seconds) - result.sorted should be(List(4, 5, 6, 13, 14, 15)) - - result.indexOf(4) < result.indexOf(5) should be(true) - result.indexOf(5) < result.indexOf(6) should be(true) - - result.indexOf(13) < result.indexOf(14) should be(true) - result.indexOf(14) < result.indexOf(15) should be(true) - + result.toSet should ===(Set(4, 5, 6, 13, 14, 15)) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index f9bf4aca14..de60bab64d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -157,4 +157,13 @@ class SourceSpec extends AkkaSpec { } } + "Repeat Source" must { + "repeat as long as it takes" in { + import FlowGraph.Implicits._ + val result = Await.result(Source.repeat(42).grouped(10000).runWith(Sink.head()), 1.second) + result.size should ===(10000) + result.toSet should ===(Set(42)) + } + } + } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 026b21638e..1a3c1b2cc0 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -134,6 +134,12 @@ object Source { def single[T](element: T): Source[T, Unit] = new Source(scaladsl.Source.single(element)) + /** + * Create a `Source` that will continually emit the given element. + */ + def repeat[T](element: T): Source[T, Unit] = + new Source(scaladsl.Source.repeat(element)) + /** * Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index c45b1112d2..31847c3433 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -324,6 +324,11 @@ object Source extends SourceApply { */ def single[T](element: T): Source[T, Unit] = apply(SynchronousIterablePublisher(List(element), "single")) // FIXME optimize + /** + * Create a `Source` that will continually emit the given element. + */ + def repeat[T](element: T): Source[T, Unit] = apply(() ⇒ Iterator.continually(element)) // FIXME optimize + /** * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`. */