ScanAsync did not complete in all scenarios #24036 (#24090)

This commit is contained in:
Johan Andrén 2017-12-05 09:08:33 +01:00 committed by Christopher Batey
parent 3eed8e79d0
commit d52be450f3
2 changed files with 40 additions and 3 deletions

View file

@ -3,10 +3,11 @@
*/
package akka.stream.scaladsl
import akka.pattern
import akka.{ NotUsed, pattern }
import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision }
import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.testkit.TestSubscriber.Probe
import akka.stream.testkit.Utils.TE
import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
@ -33,6 +34,32 @@ class FlowScanAsyncSpec extends StreamSpec {
.expectNextOrComplete(0)
}
"complete after zero-element has been consumed" in {
val (pub, sub) =
TestSource.probe[Int]
.via(Flow[Int].scanAsync(0)((acc, in) Future.successful(acc + in)))
.toMat(TestSink.probe)(Keep.both)
.run()
sub.request(10)
sub.expectNext(0)
pub.sendComplete()
sub.expectComplete()
}
"fail after zero-element has been consumed" in {
val (pub, sub) =
TestSource.probe[Int]
.via(Flow[Int].scanAsync(0)((acc, in) Future.successful(acc + in)))
.toMat(TestSink.probe)(Keep.both)
.run()
sub.request(10)
sub.expectNext(0)
pub.sendError(TE("bang"))
sub.expectError(TE("bang"))
}
"work with a single source" in {
Source.single(1)
.via(sumScanFlow)

View file

@ -432,7 +432,8 @@ private[stream] object Collect {
private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
private val ZeroHandler: OutHandler with InHandler = new OutHandler with InHandler {
override def onPush(): Unit = ()
override def onPush(): Unit =
throw new IllegalStateException("No push should happen before zero value has been consumed")
override def onPull(): Unit = {
push(out, current)
@ -507,7 +508,16 @@ private[stream] object Collect {
}
}
override def onUpstreamFinish(): Unit = {}
override def onUpstreamFinish(): Unit = {
if (current == zero) {
eventualCurrent.value match {
case Some(Success(`zero`))
// #24036 upstream completed without emitting anything but after zero was emitted downstream
completeStage()
case _ // in all other cases we will get a complete when the future completes
}
}
}
override val toString: String = s"ScanAsync.Logic(completed=${eventualCurrent.isCompleted})"
}