between error propagation and first element. Such race condition surfaced after materialization time decreased substantially.
This commit is contained in:
parent
87d5b8f627
commit
44260fe5d3
2 changed files with 9 additions and 5 deletions
|
|
@ -30,8 +30,7 @@ class FlowScanAsyncSpec extends StreamSpec {
|
|||
.via(sumScanFlow)
|
||||
.runWith(TestSink.probe[Int])
|
||||
.request(1)
|
||||
.expectNext(0)
|
||||
.expectComplete()
|
||||
.expectNextOrComplete(0)
|
||||
}
|
||||
|
||||
"work with a single source" in {
|
||||
|
|
@ -73,7 +72,8 @@ class FlowScanAsyncSpec extends StreamSpec {
|
|||
Source.failed[Int](expected)
|
||||
.via(sumScanFlow)
|
||||
.runWith(TestSink.probe[Int])
|
||||
.expectSubscriptionAndError(expected)
|
||||
.request(2)
|
||||
.expectNextOrError(0, expected)
|
||||
}
|
||||
|
||||
"with the restarting decider" should {
|
||||
|
|
|
|||
|
|
@ -71,10 +71,14 @@ class FlowScanSpec extends StreamSpec {
|
|||
.expectNext(0)
|
||||
.expectComplete()
|
||||
}
|
||||
|
||||
"fail when upstream failed" in {
|
||||
Source.failed[Int](TE("")).scan(0) { case (a, b) ⇒ a + b }.runWith(TestSink.probe[Int])
|
||||
val ex = TE("")
|
||||
Source.failed[Int](ex)
|
||||
.scan(0) { case (a, b) ⇒ a + b }
|
||||
.runWith(TestSink.probe[Int])
|
||||
.request(2)
|
||||
.expectError(TE(""))
|
||||
.expectNextOrError(0, ex)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue