diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala index db7e6c87a2..5d0bbf59eb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala @@ -3,9 +3,12 @@ */ package akka.stream.scaladsl +import scala.concurrent.duration._ import akka.stream.MaterializerSettings import akka.stream.testkit.AkkaSpec import akka.stream.testkit.ScriptedTest +import akka.stream.testkit.StreamTestKit.SubscriberProbe +import akka.stream.FlowMaterializer class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { @@ -26,6 +29,19 @@ class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { TestConfig.RandomTestRange foreach (_ ⇒ runScript(script, settings)(_.mapConcat(x ⇒ (1 to x) map (_ ⇒ x)))) } + "map and concat grouping with slow downstream" in { + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + implicit val materializer = FlowMaterializer(settings) + val s = SubscriberProbe[Int] + val input = (1 to 20).grouped(5).toList + Source(input).mapConcat(identity).map(x ⇒ { Thread.sleep(10); x }).runWith(Sink(s)) + val sub = s.expectSubscription() + sub.request(100) + for (i ← 1 to 20) s.expectNext(i) + s.expectComplete() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index d3069151eb..3a91955379 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -54,7 +54,11 @@ private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out]) override def onPull(ctxt: Context[Out]): Directive = if (currentIterator.hasNext) ctxt.push(currentIterator.next()) + else if (isFinishing) ctxt.finish() else ctxt.pull() + + override def onUpstreamFinish(ctxt: Context[Out]): TerminationDirective = + ctxt.absorbTermination() } /** @@ -284,4 +288,4 @@ private[akka] final case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapol } } -} \ No newline at end of file +}