diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala index 36cab1a259..d803da24fb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala @@ -13,9 +13,10 @@ import akka.stream.testkit.scaladsl.TestSink import akka.stream.impl.fusing._ import akka.stream.impl.fusing.GraphInterpreter._ import org.scalactic.ConversionCheckedTripleEquals +import org.scalatest.concurrent.ScalaFutures import scala.concurrent.duration.Duration -class GraphStageLogicSpec extends AkkaSpec with GraphInterpreterSpecKit with ConversionCheckedTripleEquals { +class GraphStageLogicSpec extends AkkaSpec with GraphInterpreterSpecKit with ConversionCheckedTripleEquals with ScalaFutures { implicit val materializer = ActorMaterializer() @@ -67,6 +68,18 @@ class GraphStageLogicSpec extends AkkaSpec with GraphInterpreterSpecKit with Con } } + object emitEmptyIterable extends GraphStage[SourceShape[Int]] { + val out = Outlet[Int]("out") + override val shape = SourceShape(out) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + + setHandler(out, new OutHandler { + override def onPull(): Unit = emitMultiple(out, Iterator.empty, () ⇒ emit(out, 42, () ⇒ completeStage())) + }) + + } + } + "A GraphStageLogic" must { "emit all things before completing" in assertAllStagesStopped { @@ -96,6 +109,12 @@ class GraphStageLogicSpec extends AkkaSpec with GraphInterpreterSpecKit with Con .expectComplete() } + "emit properly after empty iterable" in assertAllStagesStopped { + + Source.fromGraph(emitEmptyIterable).runWith(Sink.seq).futureValue should ===(List(42)) + + } + "invoke lifecycle hooks in the right order" in assertAllStagesStopped { val g = new GraphStage[FlowShape[Int, Int]] { val in = Inlet[Int]("in") diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 89ad8b7d83..ebb250c696 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -766,7 +766,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: } else { setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen)) } - } + } else andThen() /** * Emit a sequence of elements through the given outlet, suspending execution if necessary.