fix two wrong/flaky tests

- ordering is not preserved by shufflers in GraphOpsIntegrationSpec
- larger tolerance is needed in GraphBalanceSpec since balancing does
  not keep track of previous imbalances
- also add Source.repeat(elem)
This commit is contained in:
Roland Kuhn 2015-02-26 12:36:46 +01:00
parent ac9c61a3a5
commit 743cd98bf4
7 changed files with 37 additions and 15 deletions

View file

@ -146,6 +146,7 @@ public class FlexiRouteTest {
return new RouteLogic<T>() {
private State<OutPort, T> emitToAnyWithDemand = new State<OutPort, T>(demandFromAny(s.out(0), s.out(1))) {
@SuppressWarnings("unchecked")
@Override
public State<T, T> onInput(RouteLogicContext<T> ctx, OutPort out, T element) {
ctx.emit((Outlet<T>) out, element);

View file

@ -451,4 +451,12 @@ public class SourceTest extends StreamTest {
String result = Await.result(future2, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals("A", result);
}
@Test
public void mustRepeat() throws Exception {
final Future<List<Integer>> f = Source.repeat(42).grouped(10000).runWith(Sink.<List<Integer>> head(), materializer);
final List<Integer> result = Await.result(f, FiniteDuration.create(3, TimeUnit.SECONDS));
assertEquals(result.size(), 10000);
for (Integer i: result) assertEquals(i, (Integer) 42);
}
}

View file

@ -130,15 +130,15 @@ class GraphBalanceSpec extends AkkaSpec {
val (r1, r2, r3) = FlowGraph.closed(outputs, outputs, outputs)(Tuple3.apply) { implicit b
(o1, o2, o3)
val balance = b.add(Balance[Int](3, waitForAllDownstreams = true))
Source(Stream.fill(numElementsForSink * 3)(1)) ~> balance.in
Source.repeat(1).take(numElementsForSink * 3) ~> balance.in
balance.out(0) ~> o1.inlet
balance.out(1) ~> o2.inlet
balance.out(2) ~> o3.inlet
}.run()
Await.result(r1, 3.seconds) should be(numElementsForSink +- 1000)
Await.result(r2, 3.seconds) should be(numElementsForSink +- 1000)
Await.result(r3, 3.seconds) should be(numElementsForSink +- 1000)
Await.result(r1, 3.seconds) should be(numElementsForSink +- 2000)
Await.result(r2, 3.seconds) should be(numElementsForSink +- 2000)
Await.result(r3, 3.seconds) should be(numElementsForSink +- 2000)
}
"produce to second even though first cancels" in {

View file

@ -3,13 +3,13 @@ package akka.stream.scaladsl
import scala.collection.immutable
import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._
import akka.stream.ActorFlowMaterializer
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit.{ OnNext, SubscriberProbe }
import akka.util.ByteString
import akka.stream.{ Inlet, Outlet, Shape, Graph }
import org.scalautils.ConversionCheckedTripleEquals
object GraphOpsIntegrationSpec {
import FlowGraph.Implicits._
@ -43,7 +43,7 @@ object GraphOpsIntegrationSpec {
}
class GraphOpsIntegrationSpec extends AkkaSpec {
class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEquals {
import akka.stream.scaladsl.GraphOpsIntegrationSpec._
import FlowGraph.Implicits._
@ -185,19 +185,12 @@ class GraphOpsIntegrationSpec extends AkkaSpec {
s3.out1 ~> merge.in(0)
s3.out2 ~> merge.in(1)
merge.out.grouped(1000) ~> sink.inlet
merge.out.grouped(1000) ~> sink
}.run()
val result = Await.result(f, 3.seconds)
result.sorted should be(List(4, 5, 6, 13, 14, 15))
result.indexOf(4) < result.indexOf(5) should be(true)
result.indexOf(5) < result.indexOf(6) should be(true)
result.indexOf(13) < result.indexOf(14) should be(true)
result.indexOf(14) < result.indexOf(15) should be(true)
result.toSet should ===(Set(4, 5, 6, 13, 14, 15))
}
}

View file

@ -157,4 +157,13 @@ class SourceSpec extends AkkaSpec {
}
}
"Repeat Source" must {
"repeat as long as it takes" in {
import FlowGraph.Implicits._
val result = Await.result(Source.repeat(42).grouped(10000).runWith(Sink.head()), 1.second)
result.size should ===(10000)
result.toSet should ===(Set(42))
}
}
}

View file

@ -134,6 +134,12 @@ object Source {
def single[T](element: T): Source[T, Unit] =
new Source(scaladsl.Source.single(element))
/**
* Create a `Source` that will continually emit the given element.
*/
def repeat[T](element: T): Source[T, Unit] =
new Source(scaladsl.Source.repeat(element))
/**
* Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`.
*/

View file

@ -324,6 +324,11 @@ object Source extends SourceApply {
*/
def single[T](element: T): Source[T, Unit] = apply(SynchronousIterablePublisher(List(element), "single")) // FIXME optimize
/**
* Create a `Source` that will continually emit the given element.
*/
def repeat[T](element: T): Source[T, Unit] = apply(() Iterator.continually(element)) // FIXME optimize
/**
* A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.
*/