diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala index cb347e1958..623e6f58b0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala @@ -42,7 +42,6 @@ object GraphOpsIntegrationSpec { class GraphOpsIntegrationSpec extends StreamSpec(""" akka.stream.materializer.initial-input-buffer-size = 2 """) { - import GraphDSL.Implicits._ import akka.stream.scaladsl.GraphOpsIntegrationSpec._ "GraphDSLs" must { @@ -50,6 +49,7 @@ class GraphOpsIntegrationSpec extends StreamSpec(""" "support broadcast - merge layouts" in { val resultFuture = RunnableGraph .fromGraph(GraphDSL.create(Sink.head[Seq[Int]]) { implicit b => (sink) => + import GraphDSL.Implicits._ val bcast = b.add(Broadcast[Int](2)) val merge = b.add(Merge[Int](2)) @@ -68,6 +68,7 @@ class GraphOpsIntegrationSpec extends StreamSpec(""" val elements = 0 to 10 val out = RunnableGraph .fromGraph(GraphDSL.create(Sink.head[Seq[Int]]) { implicit b => (sink) => + import GraphDSL.Implicits._ val balance = b.add(Balance[Int](5)) val merge = b.add(Merge[Int](5)) @@ -89,6 +90,7 @@ class GraphOpsIntegrationSpec extends StreamSpec(""" val (resultFuture2, resultFuture9, resultFuture10) = RunnableGraph .fromGraph(GraphDSL.create(seqSink, seqSink, seqSink)(Tuple3.apply) { implicit b => (sink2, sink9, sink10) => + import GraphDSL.Implicits._ val b3 = b.add(Broadcast[Int](2)) val b7 = b.add(Broadcast[Int](2)) val b11 = b.add(Broadcast[Int](3)) @@ -137,6 +139,7 @@ class GraphOpsIntegrationSpec extends StreamSpec(""" val resultFuture = RunnableGraph .fromGraph(GraphDSL.create(Sink.head[Seq[Int]]) { implicit b => (sink) => + import GraphDSL.Implicits._ val bcast = b.add(Broadcast[Int](2)) val merge = b.add(Merge[Int](2)) @@ -157,6 +160,7 @@ class GraphOpsIntegrationSpec extends StreamSpec(""" val flow = Flow[Int].map(_ * 2) RunnableGraph .fromGraph(GraphDSL.create() { implicit builder => + import GraphDSL.Implicits._ Source.fromPublisher(p) ~> flow ~> Sink.fromSubscriber(s) ClosedShape }) @@ -175,6 +179,7 @@ class GraphOpsIntegrationSpec extends StreamSpec(""" val f: Future[Seq[Int]] = RunnableGraph .fromGraph(GraphDSL.create(shuffler, shuffler, shuffler, Sink.head[Seq[Int]])((_, _, _, fut) => fut) { implicit b => (s1, s2, s3, sink) => + import GraphDSL.Implicits._ val merge = b.add(Merge[Int](2)) Source(List(1, 2, 3)) ~> s1.in1 @@ -209,6 +214,7 @@ class GraphOpsIntegrationSpec extends StreamSpec(""" val g: RunnableGraph[Seq[Future[String]]] = RunnableGraph.fromGraph(GraphDSL.create(sinks) { implicit b => sinkList => + import GraphDSL.Implicits._ val broadcast = b.add(Broadcast[String](sinkList.size)) Source(List("ax", "bx", "cx")) ~> broadcast @@ -235,6 +241,7 @@ class GraphOpsIntegrationSpec extends StreamSpec(""" val g: RunnableGraph[Seq[Future[immutable.Seq[Int]]]] = RunnableGraph.fromGraph(GraphDSL.create(sinks) { implicit b => sinkList => + import GraphDSL.Implicits._ val broadcast = b.add(Broadcast[Int](sinkList.size)) Source(List(1, 2, 3)) ~> broadcast