#19457: emitMultiple should handle empty iterator

(cherry picked from commit 96e24c8)
This commit is contained in:
Endre Sándor Varga 2016-01-14 16:35:37 +01:00
parent d4b146588a
commit d5b802bfdc
2 changed files with 21 additions and 2 deletions

View file

@ -13,9 +13,10 @@ import akka.stream.testkit.scaladsl.TestSink
import akka.stream.impl.fusing._
import akka.stream.impl.fusing.GraphInterpreter._
import org.scalactic.ConversionCheckedTripleEquals
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration.Duration
class GraphStageLogicSpec extends AkkaSpec with GraphInterpreterSpecKit with ConversionCheckedTripleEquals {
class GraphStageLogicSpec extends AkkaSpec with GraphInterpreterSpecKit with ConversionCheckedTripleEquals with ScalaFutures {
implicit val materializer = ActorMaterializer()
@ -67,6 +68,18 @@ class GraphStageLogicSpec extends AkkaSpec with GraphInterpreterSpecKit with Con
}
}
object emitEmptyIterable extends GraphStage[SourceShape[Int]] {
val out = Outlet[Int]("out")
override val shape = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = emitMultiple(out, Iterator.empty, () emit(out, 42, () completeStage()))
})
}
}
"A GraphStageLogic" must {
"emit all things before completing" in assertAllStagesStopped {
@ -96,6 +109,12 @@ class GraphStageLogicSpec extends AkkaSpec with GraphInterpreterSpecKit with Con
.expectComplete()
}
"emit properly after empty iterable" in assertAllStagesStopped {
Source.fromGraph(emitEmptyIterable).runWith(Sink.seq).futureValue should ===(List(42))
}
"invoke lifecycle hooks in the right order" in assertAllStagesStopped {
val g = new GraphStage[FlowShape[Int, Int]] {
val in = Inlet[Int]("in")

View file

@ -766,7 +766,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
} else {
setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen))
}
}
} else andThen()
/**
* Emit a sequence of elements through the given outlet, suspending execution if necessary.