2014-10-08 18:16:57 +02:00
|
|
|
/**
|
2016-02-23 12:58:39 +01:00
|
|
|
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
2014-10-08 18:16:57 +02:00
|
|
|
*/
|
|
|
|
|
package akka.stream.impl.fusing
|
|
|
|
|
|
2016-01-21 16:52:44 +01:00
|
|
|
import akka.stream.impl.ConstantFun
|
2015-05-08 12:47:49 +02:00
|
|
|
import akka.stream.stage._
|
2016-02-25 14:27:45 +01:00
|
|
|
import akka.testkit.AkkaSpec
|
2015-09-02 16:46:37 +02:00
|
|
|
import akka.testkit.EventFilter
|
2015-05-08 12:47:49 +02:00
|
|
|
|
2016-04-14 00:37:15 +08:00
|
|
|
import akka.stream._
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
2015-02-04 09:26:32 +01:00
|
|
|
import Supervision.stoppingDecider
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
/*
|
2016-02-28 23:14:29 +02:00
|
|
|
* These tests were written for the previous version of the interpreter, the so called OneBoundedInterpreter.
|
2015-10-31 14:46:10 +01:00
|
|
|
* These stages are now properly emulated by the GraphInterpreter and many of the edge cases were relevant to
|
|
|
|
|
* the execution model of the old one. Still, these tests are very valuable, so please do not remove.
|
|
|
|
|
*/
|
|
|
|
|
|
2016-02-28 23:14:29 +02:00
|
|
|
val takeOne = Take(1)
|
|
|
|
|
val takeTwo = Take(2)
|
|
|
|
|
|
2014-10-08 18:16:57 +02:00
|
|
|
"Interpreter" must {
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
"implement map correctly" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ x + 1, stoppingDecider))) {
|
2014-10-08 18:16:57 +02:00
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(OnNext(1)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(OnNext(2)))
|
|
|
|
|
|
|
|
|
|
upstream.onComplete()
|
|
|
|
|
lastEvents() should be(Set(OnComplete))
|
|
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
"implement chain of maps correctly" in new OneBoundedSetup[Int](Seq(
|
2015-02-04 09:26:32 +01:00
|
|
|
Map((x: Int) ⇒ x + 1, stoppingDecider),
|
|
|
|
|
Map((x: Int) ⇒ x * 2, stoppingDecider),
|
|
|
|
|
Map((x: Int) ⇒ x + 1, stoppingDecider))) {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(OnNext(3)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(OnNext(5)))
|
|
|
|
|
|
|
|
|
|
downstream.cancel()
|
|
|
|
|
lastEvents() should be(Set(Cancel))
|
|
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
"work with only boundary ops" in new OneBoundedSetup[Int](Seq.empty) {
|
2014-10-08 18:16:57 +02:00
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(OnNext(0)))
|
|
|
|
|
|
|
|
|
|
upstream.onComplete()
|
|
|
|
|
lastEvents() should be(Set(OnComplete))
|
|
|
|
|
}
|
|
|
|
|
|
2016-04-14 11:04:08 +02:00
|
|
|
"implement one-to-many many-to-one chain correctly" in new OneBoundedSetup[Int](
|
2016-04-14 00:37:15 +08:00
|
|
|
Doubler(),
|
2016-04-14 11:04:08 +02:00
|
|
|
Filter((x: Int) ⇒ x != 0)) {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(OnNext(1)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(1)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onComplete()
|
|
|
|
|
lastEvents() should be(Set(OnComplete))
|
|
|
|
|
}
|
|
|
|
|
|
2016-04-14 11:04:08 +02:00
|
|
|
"implement many-to-one one-to-many chain correctly" in new OneBoundedSetup[Int](
|
|
|
|
|
Filter((x: Int) ⇒ x != 0),
|
2016-04-14 00:37:15 +08:00
|
|
|
Doubler()) {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(OnNext(1)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(1)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
downstream.cancel()
|
|
|
|
|
lastEvents() should be(Set(Cancel))
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-28 23:14:29 +02:00
|
|
|
"implement take" in new OneBoundedSetup[Int](takeTwo) {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(OnNext(0)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(OnNext(1), Cancel, OnComplete))
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-28 23:14:29 +02:00
|
|
|
"implement take inside a chain" in new OneBoundedSetup[Int](
|
2016-04-14 11:04:08 +02:00
|
|
|
Filter((x: Int) ⇒ x != 0),
|
2016-02-28 23:14:29 +02:00
|
|
|
takeTwo,
|
|
|
|
|
Map((x: Int) ⇒ x + 1, stoppingDecider).toGS) {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(OnNext(2)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(2)
|
|
|
|
|
lastEvents() should be(Set(Cancel, OnComplete, OnNext(3)))
|
|
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
"implement fold" in new OneBoundedSetup[Int](Seq(Fold(0, (agg: Int, x: Int) ⇒ agg + x, stoppingDecider))) {
|
2014-10-08 18:16:57 +02:00
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(2)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onComplete()
|
|
|
|
|
lastEvents() should be(Set(OnNext(3), OnComplete))
|
|
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
"implement fold with proper cancel" in new OneBoundedSetup[Int](Seq(Fold(0, (agg: Int, x: Int) ⇒ agg + x, stoppingDecider))) {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(2)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
downstream.cancel()
|
|
|
|
|
lastEvents() should be(Set(Cancel))
|
|
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
"work if fold completes while not in a push position" in new OneBoundedSetup[Int](Seq(Fold(0, (agg: Int, x: Int) ⇒ agg + x, stoppingDecider))) {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
upstream.onComplete()
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnComplete, OnNext(0)))
|
|
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
"implement grouped" in new OneBoundedSetup[Int](Seq(Grouped(3))) {
|
2014-10-08 18:16:57 +02:00
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(2)
|
|
|
|
|
lastEvents() should be(Set(OnNext(Vector(0, 1, 2))))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(3)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onComplete()
|
|
|
|
|
lastEvents() should be(Set(OnNext(Vector(3)), OnComplete))
|
|
|
|
|
}
|
|
|
|
|
|
2016-01-21 16:52:44 +01:00
|
|
|
"implement batch (conflate)" in new OneBoundedSetup[Int](Batch(
|
|
|
|
|
1L,
|
|
|
|
|
ConstantFun.zeroLong,
|
2014-10-08 18:16:57 +02:00
|
|
|
(in: Int) ⇒ in,
|
2016-01-21 16:52:44 +01:00
|
|
|
(agg: Int, x: Int) ⇒ agg + x)) {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(OnNext(0), RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(2)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(3)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
upstream.onNext(4)
|
|
|
|
|
lastEvents() should be(Set(OnNext(4), RequestOne))
|
|
|
|
|
|
|
|
|
|
downstream.cancel()
|
|
|
|
|
lastEvents() should be(Set(Cancel))
|
|
|
|
|
}
|
|
|
|
|
|
2016-01-18 11:29:14 +01:00
|
|
|
"implement expand" in new OneBoundedSetup[Int](new Expand(Iterator.continually(_: Int))) {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne, OnNext(0)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(0)))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne, OnNext(1)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(1)))
|
|
|
|
|
|
|
|
|
|
upstream.onComplete()
|
|
|
|
|
lastEvents() should be(Set(OnComplete))
|
|
|
|
|
}
|
|
|
|
|
|
2016-01-21 16:52:44 +01:00
|
|
|
"work with batch-batch (conflate-conflate)" in new OneBoundedSetup[Int](
|
|
|
|
|
Batch(
|
|
|
|
|
1L,
|
|
|
|
|
ConstantFun.zeroLong,
|
2014-10-08 18:16:57 +02:00
|
|
|
(in: Int) ⇒ in,
|
2016-01-21 16:52:44 +01:00
|
|
|
(agg: Int, x: Int) ⇒ agg + x),
|
|
|
|
|
Batch(
|
|
|
|
|
1L,
|
|
|
|
|
ConstantFun.zeroLong,
|
2014-10-08 18:16:57 +02:00
|
|
|
(in: Int) ⇒ in,
|
2016-01-21 16:52:44 +01:00
|
|
|
(agg: Int, x: Int) ⇒ agg + x)) {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(OnNext(0), RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(2)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(3)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
upstream.onNext(4)
|
|
|
|
|
lastEvents() should be(Set(OnNext(4), RequestOne))
|
|
|
|
|
|
|
|
|
|
downstream.cancel()
|
|
|
|
|
lastEvents() should be(Set(Cancel))
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2016-01-18 11:29:14 +01:00
|
|
|
"work with expand-expand" in new OneBoundedSetup[Int](
|
|
|
|
|
new Expand(Iterator.from),
|
|
|
|
|
new Expand(Iterator.from)) {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(0)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
2014-11-25 12:26:24 +01:00
|
|
|
lastEvents() should be(Set(OnNext(1)))
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2014-11-25 12:26:24 +01:00
|
|
|
upstream.onNext(10)
|
2014-10-08 18:16:57 +02:00
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
2014-11-25 12:26:24 +01:00
|
|
|
lastEvents() should be(Set(RequestOne, OnNext(2))) // One element is still in the pipeline
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
downstream.requestOne()
|
2014-11-25 12:26:24 +01:00
|
|
|
lastEvents() should be(Set(OnNext(10)))
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
downstream.requestOne()
|
2014-11-25 12:26:24 +01:00
|
|
|
lastEvents() should be(Set(OnNext(11)))
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
upstream.onComplete()
|
2014-11-25 12:26:24 +01:00
|
|
|
downstream.requestOne()
|
|
|
|
|
// This is correct! If you don't believe, run the interpreter with Debug on
|
|
|
|
|
lastEvents() should be(Set(OnComplete, OnNext(12)))
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
2016-01-21 16:52:44 +01:00
|
|
|
"implement batch-expand (conflate-expand)" in new OneBoundedSetup[Int](
|
|
|
|
|
Batch(
|
|
|
|
|
1L,
|
|
|
|
|
ConstantFun.zeroLong,
|
2014-10-08 18:16:57 +02:00
|
|
|
(in: Int) ⇒ in,
|
2016-01-21 16:52:44 +01:00
|
|
|
(agg: Int, x: Int) ⇒ agg + x),
|
2016-01-18 11:29:14 +01:00
|
|
|
new Expand(Iterator.continually(_: Int))) {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(0)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(0)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(1)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(1)))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(2)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(2)))
|
|
|
|
|
|
|
|
|
|
downstream.cancel()
|
|
|
|
|
lastEvents() should be(Set(Cancel))
|
|
|
|
|
}
|
|
|
|
|
|
2016-01-21 16:52:44 +01:00
|
|
|
"implement doubler-conflate (doubler-batch)" in new OneBoundedSetup[Int](
|
2016-04-14 00:37:15 +08:00
|
|
|
Doubler(),
|
2016-01-21 16:52:44 +01:00
|
|
|
Batch(
|
|
|
|
|
1L,
|
|
|
|
|
ConstantFun.zeroLong,
|
2014-10-08 18:16:57 +02:00
|
|
|
(in: Int) ⇒ in,
|
2016-01-21 16:52:44 +01:00
|
|
|
(agg: Int, x: Int) ⇒ agg + x)) {
|
2014-10-08 18:16:57 +02:00
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(2)
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(6)))
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
// Note, the new interpreter has no jumpback table, still did not want to remove the test
|
2016-04-14 00:37:15 +08:00
|
|
|
"work with jumpback table and completed elements" in new OneBoundedSetup[Int](
|
|
|
|
|
Map((x: Int) ⇒ x, stoppingDecider).toGS,
|
|
|
|
|
Map((x: Int) ⇒ x, stoppingDecider).toGS,
|
2015-01-26 10:31:57 +01:00
|
|
|
KeepGoing(),
|
2016-04-14 00:37:15 +08:00
|
|
|
Map((x: Int) ⇒ x, stoppingDecider).toGS,
|
|
|
|
|
Map((x: Int) ⇒ x, stoppingDecider).toGS) {
|
2015-01-26 10:31:57 +01:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(OnNext(1)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
|
|
|
|
upstream.onNext(2)
|
|
|
|
|
lastEvents() should be(Set(OnNext(2)))
|
|
|
|
|
|
|
|
|
|
upstream.onComplete()
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(2)))
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(OnNext(2)))
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
"work with pushAndFinish if upstream completes with pushAndFinish" in new OneBoundedSetup[Int](Seq(
|
2015-05-04 16:05:36 +02:00
|
|
|
new PushFinishStage)) {
|
|
|
|
|
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
upstream.onNextAndComplete(0)
|
|
|
|
|
lastEvents() should be(Set(OnNext(0), OnComplete))
|
2015-05-04 16:05:36 +02:00
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
"work with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[Int](Seq(
|
2015-05-04 16:05:36 +02:00
|
|
|
Map((x: Any) ⇒ x, stoppingDecider),
|
|
|
|
|
new PushFinishStage,
|
|
|
|
|
Map((x: Any) ⇒ x, stoppingDecider))) {
|
|
|
|
|
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
upstream.onNextAndComplete(1)
|
|
|
|
|
lastEvents() should be(Set(OnNext(1), OnComplete))
|
2015-05-04 16:05:36 +02:00
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
"work with pushAndFinish if upstream completes with pushAndFinish and downstream immediately pulls" in new OneBoundedSetup[Int](Seq(
|
2015-05-04 16:05:36 +02:00
|
|
|
new PushFinishStage,
|
2015-10-31 14:46:10 +01:00
|
|
|
Fold(0, (x: Int, y: Int) ⇒ x + y, stoppingDecider))) {
|
2015-05-04 16:05:36 +02:00
|
|
|
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
upstream.onNextAndComplete(1)
|
|
|
|
|
lastEvents() should be(Set(OnNext(1), OnComplete))
|
2015-05-04 16:05:36 +02:00
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
"report error if pull is called while op is terminating" in new OneBoundedSetup[Int](Seq(new PushPullStage[Any, Any] {
|
2015-05-08 12:47:49 +02:00
|
|
|
override def onPull(ctx: Context[Any]): SyncDirective = ctx.pull()
|
|
|
|
|
override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = ctx.pull()
|
|
|
|
|
override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective = ctx.absorbTermination()
|
|
|
|
|
})) {
|
|
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
2016-02-12 08:28:16 +01:00
|
|
|
EventFilter[IllegalArgumentException](pattern = ".*Cannot pull closed port.*", occurrences = 1).intercept {
|
|
|
|
|
upstream.onComplete()
|
|
|
|
|
}
|
2015-05-08 12:47:49 +02:00
|
|
|
val ev = lastEvents()
|
|
|
|
|
ev.nonEmpty should be(true)
|
|
|
|
|
ev.forall {
|
2015-10-31 14:46:10 +01:00
|
|
|
case OnError(_: IllegalArgumentException) ⇒ true
|
|
|
|
|
case _ ⇒ false
|
2015-05-08 12:47:49 +02:00
|
|
|
} should be(true)
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-28 23:14:29 +02:00
|
|
|
"implement take-take" in new OneBoundedSetup[Int](
|
|
|
|
|
takeOne,
|
|
|
|
|
takeOne) {
|
2015-05-04 16:05:36 +02:00
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
upstream.onNext(1)
|
|
|
|
|
lastEvents() should be(Set(OnNext(1), OnComplete, Cancel))
|
2015-05-04 16:05:36 +02:00
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-28 23:14:29 +02:00
|
|
|
"implement take-take with pushAndFinish from upstream" in new OneBoundedSetup[Int](
|
|
|
|
|
takeOne,
|
|
|
|
|
takeOne) {
|
2015-05-04 16:05:36 +02:00
|
|
|
lastEvents() should be(Set.empty)
|
|
|
|
|
|
|
|
|
|
downstream.requestOne()
|
|
|
|
|
lastEvents() should be(Set(RequestOne))
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
upstream.onNextAndComplete(1)
|
|
|
|
|
lastEvents() should be(Set(OnNext(1), OnComplete))
|
2015-05-04 16:05:36 +02:00
|
|
|
|
|
|
|
|
}
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2015-09-02 16:46:37 +02:00
|
|
|
class InvalidAbsorbTermination extends PushPullStage[Int, Int] {
|
|
|
|
|
override def onPull(ctx: Context[Int]): SyncDirective = ctx.pull()
|
|
|
|
|
override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = ctx.push(elem)
|
|
|
|
|
override def onDownstreamFinish(ctx: Context[Int]): TerminationDirective = ctx.absorbTermination()
|
|
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
"not allow absorbTermination from onDownstreamFinish()" in new OneBoundedSetup[Int](Seq(
|
2015-09-02 16:46:37 +02:00
|
|
|
new InvalidAbsorbTermination)) {
|
|
|
|
|
lastEvents() should be(Set.empty)
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2015-12-14 16:42:43 +01:00
|
|
|
EventFilter[UnsupportedOperationException]("It is not allowed to call absorbTermination() from onDownstreamFinish.", occurrences = 1).intercept {
|
2015-09-02 16:46:37 +02:00
|
|
|
downstream.cancel()
|
|
|
|
|
lastEvents() should be(Set(Cancel))
|
|
|
|
|
}
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2015-09-02 16:46:37 +02:00
|
|
|
}
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2016-04-14 00:37:15 +08:00
|
|
|
private[akka] final case class Doubler[T]() extends GraphStage[FlowShape[T, T]] {
|
|
|
|
|
val out: Outlet[T] = Outlet("Doubler.out")
|
|
|
|
|
val in: Inlet[T] = Inlet("Doubler.in")
|
|
|
|
|
|
|
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
|
|
|
|
new GraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
var latest: T = _
|
|
|
|
|
var oneMore = false
|
|
|
|
|
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
latest = grab(in)
|
|
|
|
|
oneMore = true
|
|
|
|
|
push(out, latest)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]]
|
|
|
|
|
* is now allowed to be called on this port.
|
|
|
|
|
*/
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
if (oneMore) {
|
|
|
|
|
push(out, latest)
|
|
|
|
|
oneMore = false
|
|
|
|
|
} else {
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2015-10-31 14:46:10 +01:00
|
|
|
|
2016-05-06 10:32:06 +02:00
|
|
|
override val shape: FlowShape[T, T] = FlowShape(in, out)
|
2015-10-31 14:46:10 +01:00
|
|
|
}
|
|
|
|
|
|
2016-04-14 00:37:15 +08:00
|
|
|
private[akka] final case class KeepGoing[T]() extends GraphStage[FlowShape[T, T]] {
|
|
|
|
|
val in = Inlet[T]("KeepGoing.in")
|
|
|
|
|
val out = Outlet[T]("KeepGoing.out")
|
2015-10-31 14:46:10 +01:00
|
|
|
|
2016-04-14 00:37:15 +08:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
|
|
|
|
new GraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
var lastElem: T = _
|
2015-10-31 14:46:10 +01:00
|
|
|
|
2016-04-14 00:37:15 +08:00
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
lastElem = grab(in)
|
|
|
|
|
push(out, lastElem)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
if (isClosed(in)) {
|
|
|
|
|
push(out, lastElem)
|
|
|
|
|
} else {
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = {}
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2015-10-31 14:46:10 +01:00
|
|
|
|
2016-04-14 00:37:15 +08:00
|
|
|
override val shape: FlowShape[T, T] = FlowShape(in, out)
|
2015-10-31 14:46:10 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// This test is related to issue #17351
|
|
|
|
|
private[akka] class PushFinishStage(onPostStop: () ⇒ Unit = () ⇒ ()) extends PushStage[Any, Any] {
|
|
|
|
|
override def onPush(elem: Any, ctx: Context[Any]): SyncDirective =
|
|
|
|
|
ctx.pushAndFinish(elem)
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective =
|
|
|
|
|
ctx.fail(akka.stream.testkit.Utils.TE("Cannot happen"))
|
|
|
|
|
|
|
|
|
|
override def postStop(): Unit =
|
|
|
|
|
onPostStop()
|
|
|
|
|
}
|
|
|
|
|
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|