=str 20843 Scan.in cannot pull closed port
This commit is contained in:
parent
8d0c163eaf
commit
f679e14168
2 changed files with 29 additions and 1 deletions
|
|
@ -64,5 +64,18 @@ class FlowScanSpec extends AkkaSpec {
|
|||
Source(List(1, 3, -1, 5, 7)).via(scan).runWith(TestSink.probe)
|
||||
.toStrict(1.second) should ===(Seq(0, 1, 4, 9, 16))
|
||||
}
|
||||
|
||||
"scan normally for empty source" in {
|
||||
Source.empty[Int].scan(0) { case (a, b) ⇒ a + b }.runWith(TestSink.probe[Int])
|
||||
.request(2)
|
||||
.expectNext(0)
|
||||
.expectComplete()
|
||||
}
|
||||
"fail after emitting first element when upsrteam failed" in {
|
||||
Source.failed[Int](TE("")).scan(0) { case (a, b) ⇒ a + b }.runWith(TestSink.probe[Int])
|
||||
.request(2)
|
||||
.expectNext(0)
|
||||
.expectError(TE(""))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -315,7 +315,22 @@ private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) ex
|
|||
setHandlers(in, out, self)
|
||||
}
|
||||
})
|
||||
setHandler(in, totallyIgnorantInput)
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = ()
|
||||
override def onUpstreamFinish(): Unit = setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
push(out, aggregator)
|
||||
completeStage()
|
||||
}
|
||||
})
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
push(out, aggregator)
|
||||
failStage(ex)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
override def onPull(): Unit = pull(in)
|
||||
override def onPush(): Unit = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue