=str 20843 Scan failure should flow downstream ASAP
This commit is contained in:
parent
f679e14168
commit
a299644b21
2 changed files with 1 additions and 8 deletions
|
|
@ -71,10 +71,9 @@ class FlowScanSpec extends AkkaSpec {
|
||||||
.expectNext(0)
|
.expectNext(0)
|
||||||
.expectComplete()
|
.expectComplete()
|
||||||
}
|
}
|
||||||
"fail after emitting first element when upsrteam failed" in {
|
"fail when upstream failed" in {
|
||||||
Source.failed[Int](TE("")).scan(0) { case (a, b) ⇒ a + b }.runWith(TestSink.probe[Int])
|
Source.failed[Int](TE("")).scan(0) { case (a, b) ⇒ a + b }.runWith(TestSink.probe[Int])
|
||||||
.request(2)
|
.request(2)
|
||||||
.expectNext(0)
|
|
||||||
.expectError(TE(""))
|
.expectError(TE(""))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -324,12 +324,6 @@ private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) ex
|
||||||
completeStage()
|
completeStage()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
override def onUpstreamFailure(ex: Throwable): Unit = setHandler(out, new OutHandler {
|
|
||||||
override def onPull(): Unit = {
|
|
||||||
push(out, aggregator)
|
|
||||||
failStage(ex)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
override def onPull(): Unit = pull(in)
|
override def onPull(): Unit = pull(in)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue