parent
b005c906d9
commit
c8026dc714
1 changed files with 4 additions and 9 deletions
|
|
@ -15,12 +15,9 @@ class TickSourceSpec extends StreamSpec {
|
|||
"A Flow based on tick publisher" must {
|
||||
"produce ticks" in assertAllStagesStopped {
|
||||
val c = TestSubscriber.manualProbe[String]()
|
||||
Source.tick(1.second, 500.millis, "tick").to(Sink.fromSubscriber(c)).run()
|
||||
Source.tick(1.second, 1.second, "tick").to(Sink.fromSubscriber(c)).run()
|
||||
val sub = c.expectSubscription()
|
||||
sub.request(3)
|
||||
c.expectNoMsg(600.millis)
|
||||
c.expectNext("tick")
|
||||
c.expectNoMsg(200.millis)
|
||||
sub.request(2)
|
||||
c.expectNext("tick")
|
||||
c.expectNoMsg(200.millis)
|
||||
c.expectNext("tick")
|
||||
|
|
@ -84,16 +81,14 @@ class TickSourceSpec extends StreamSpec {
|
|||
|
||||
"be possible to cancel" in assertAllStagesStopped {
|
||||
val c = TestSubscriber.manualProbe[String]()
|
||||
val tickSource = Source.tick(1.second, 500.millis, "tick")
|
||||
val tickSource = Source.tick(1.second, 1.second, "tick")
|
||||
val cancellable = tickSource.to(Sink.fromSubscriber(c)).run()
|
||||
val sub = c.expectSubscription()
|
||||
sub.request(3)
|
||||
sub.request(2)
|
||||
c.expectNoMsg(600.millis)
|
||||
c.expectNext("tick")
|
||||
c.expectNoMsg(200.millis)
|
||||
c.expectNext("tick")
|
||||
c.expectNoMsg(200.millis)
|
||||
c.expectNext("tick")
|
||||
cancellable.cancel()
|
||||
awaitCond(cancellable.isCancelled)
|
||||
sub.request(3)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue