=str #16338 Emit final elements in mapConcat

This commit is contained in:
Patrik Nordwall 2014-11-19 19:50:23 +01:00
parent 523605349c
commit 837ce3eb12
2 changed files with 21 additions and 1 deletions

View file

@ -54,7 +54,11 @@ private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out])
override def onPull(ctxt: Context[Out]): Directive =
if (currentIterator.hasNext) ctxt.push(currentIterator.next())
else if (isFinishing) ctxt.finish()
else ctxt.pull()
override def onUpstreamFinish(ctxt: Context[Out]): TerminationDirective =
ctxt.absorbTermination()
}
/**
@ -284,4 +288,4 @@ private[akka] final case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapol
}
}
}
}