diff --git a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala index 4a3e725ba6..2665490610 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala @@ -4,20 +4,19 @@ package akka.stream +import akka.stream.impl.fusing.GraphInterpreter import akka.stream.scaladsl._ import akka.stream.testkit.StreamSpec -import akka.stream.impl.fusing.GraphInterpreter -import akka.event.BusLogging class FusingSpec extends StreamSpec { - final val Debug = false implicit val materializer = ActorMaterializer() - def graph(async: Boolean) = - Source.unfold(1)(x ⇒ Some(x → x)).filter(_ % 2 == 1) - .alsoTo(Flow[Int].fold(0)(_ + _).to(Sink.head.named("otherSink")).addAttributes(if (async) Attributes.asyncBoundary else Attributes.none)) - .via(Flow[Int].fold(1)(_ + _).named("mainSink")) + def actorRunningStage = { + GraphInterpreter.currentInterpreter.context + } + + val snitchFlow = Flow[Int].map(x ⇒ { testActor ! actorRunningStage; x }).async "SubFusingActorMaterializer" must { @@ -33,41 +32,56 @@ class FusingSpec extends StreamSpec { } "use multiple actors when there are asynchronous boundaries in the subflows (manual)" in { - def ref = { - val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging] - bus.logSource - } - val async = Flow[Int].map(x ⇒ { testActor ! ref; x }).async + val async = Flow[Int].map(x ⇒ { testActor ! actorRunningStage; x }).async Source(0 to 9) - .map(x ⇒ { testActor ! ref; x }) + .via(snitchFlow.async) .flatMapMerge(5, i ⇒ Source.single(i).via(async)) .grouped(1000) .runWith(Sink.head) .futureValue .sorted should ===(0 to 9) val refs = receiveN(20) - withClue(s"refs=\n${refs.mkString("\n")}") { - refs.toSet.size should ===(11) // main flow + 10 subflows - } + refs.toSet should have size (11) // main flow + 10 subflows } "use multiple actors when there are asynchronous boundaries in the subflows (combinator)" in { - def ref = { - val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging] - bus.logSource - } - val flow = Flow[Int].map(x ⇒ { testActor ! ref; x }) Source(0 to 9) - .map(x ⇒ { testActor ! ref; x }) - .flatMapMerge(5, i ⇒ Source.single(i).via(flow.async)) + .via(snitchFlow) + .flatMapMerge(5, i ⇒ Source.single(i).via(snitchFlow.async)) .grouped(1000) .runWith(Sink.head) .futureValue .sorted should ===(0 to 9) val refs = receiveN(20) - withClue(s"refs=\n${refs.mkString("\n")}") { - refs.toSet.size should ===(11) // main flow + 10 subflows - } + refs.toSet should have size (11) // main flow + 10 subflows + } + + "use one actor per grouped substream when there is an async boundary around the flow (manual)" in { + val in = 0 to 9 + Source(in) + .via(snitchFlow) + .groupBy(in.size, identity) + .via(snitchFlow.async) + .mergeSubstreams + .runWith(Sink.seq) + .futureValue.sorted should ===(in) + val refs = receiveN(in.size + in.size) // each element through the first map, then the second map + + refs.toSet should have size (in.size + 1) // outer/main actor + 1 actor per subflow + } + + "use one actor per grouped substream when there is an async boundary around the flow (combinator)" in { + val in = 0 to 9 + Source(in) + .via(snitchFlow) + .groupBy(in.size, identity) + .via(snitchFlow) + .async + .mergeSubstreams + .runWith(Sink.seq) + .futureValue.sorted should ===(in) + val refs = receiveN(in.size + in.size) // each element through the first map, then the second map + refs.toSet should have size (in.size + 1) // outer/main actor + 1 actor per subflow } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 406790f93f..b67aafb7fb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -6,7 +6,7 @@ package akka.stream.scaladsl import java.util -import akka.NotUsed +import akka.{ Done, NotUsed } import akka.actor.ActorSystem import akka.stream.Attributes._ import akka.stream.impl.SinkModule @@ -26,6 +26,8 @@ import akka.stream.testkit.scaladsl.TestSource import akka.stream.testkit.scaladsl.TestSink import java.util.concurrent.ThreadLocalRandom +import akka.testkit.TestLatch + object FlowGroupBySpec { implicit class Lift[M](val f: SubFlow[Int, M, Source[Int, M]#Repr, RunnableGraph[M]]) extends AnyVal { @@ -617,6 +619,35 @@ class FlowGroupBySpec extends StreamSpec { upstreamSubscription.sendComplete() } + "not block all substreams when one is blocked but has a buffer in front" in assertAllStagesStopped { + case class Elem(id: Int, substream: Int, f: () ⇒ Any) + val queue = Source.queue[Elem](3, OverflowStrategy.backpressure) + .groupBy(2, _.substream) + .buffer(2, OverflowStrategy.backpressure) + .map { _.f() }.async + .to(Sink.ignore) + .run() + + val threeProcessed = Promise[Done]() + val blockSubStream1 = TestLatch() + List( + Elem(1, 1, () ⇒ { + // timeout just to not wait forever if something is wrong, not really relevant for test + Await.result(blockSubStream1, 10.seconds) + 1 + }), + Elem(2, 1, () ⇒ 2), + Elem(3, 2, () ⇒ { + threeProcessed.success(Done) + 3 + })).foreach(queue.offer) + // two and three are processed as fast as possible, not blocked by substream 1 being clogged + threeProcessed.future.futureValue should ===(Done) + // let 1 pass so stream can complete + blockSubStream1.open() + queue.complete() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 5bbf1ebc68..c63927f39c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -743,7 +743,11 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff case OptionVal.Some(n) ⇒ n case OptionVal.None ⇒ islandName } - materializer.actorOf(props, actorName) + + val ref = materializer.actorOf(props, actorName) + if (PhasedFusingActorMaterializer.Debug) { + println(s"Spawned actor [$ref] with shell: $shell") + } } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index dd7076c50c..cc94b8ad6d 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -114,6 +114,9 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * | +------+ +------+ | * +----------------------------+ * }}} + * + * Note that attributes set on the returned graph, including async boundaries are now for the entire graph and not + * the `SubFlow`. for example `async` will not have any effect as the returned graph is the entire, closed graph. */ def to(sink: Graph[SinkShape[Out], _]): Sink[In, Mat] = new Sink(delegate.to(sink)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SubFlow.scala index c4a87caf73..cebfb5bfb0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SubFlow.scala @@ -21,6 +21,9 @@ trait SubFlow[+Out, +Mat, +F[+_], C] extends FlowOps[Out, Mat] { /** * Attach a [[Sink]] to each sub-flow, closing the overall Graph that is being * constructed. + * + * Note that attributes set on the returned graph, including async boundaries are now for the entire graph and not + * the `SubFlow`. for example `async` will not have any effect as the returned graph is the entire, closed graph. */ def to[M](sink: Graph[SinkShape[Out], M]): C