=str #18189: Explicitly disallow absorbTermination() from onDownstreamFinish()

This commit is contained in:
Endre Sándor Varga 2015-09-02 16:46:37 +02:00
parent 7bdfd4e50f
commit 62a299f3a5
4 changed files with 33 additions and 13 deletions

View file

@ -4,6 +4,7 @@
package akka.stream.impl.fusing
import akka.stream.stage._
import akka.testkit.EventFilter
import scala.util.control.NoStackTrace
import akka.stream.Supervision
@ -518,16 +519,6 @@ class InterpreterSpec extends InterpreterSpecKit {
} should be(true)
}
"implement expand-filter" in pending
"implement take-conflate" in pending
"implement conflate-take" in pending
"implement take-expand" in pending
"implement expand-take" in pending
"implement take-take" in new TestSetup(Seq(
Take(1),
Take(1))) {
@ -554,11 +545,22 @@ class InterpreterSpec extends InterpreterSpecKit {
}
"implement take-drop" in pending
class InvalidAbsorbTermination extends PushPullStage[Int, Int] {
override def onPull(ctx: Context[Int]): SyncDirective = ctx.pull()
override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = ctx.push(elem)
override def onDownstreamFinish(ctx: Context[Int]): TerminationDirective = ctx.absorbTermination()
}
"implement drop-take" in pending
"not allow absorbTermination from onDownstreamFinish()" in new TestSetup(Seq(
new InvalidAbsorbTermination)) {
lastEvents() should be(Set.empty)
"work with keep-going ops" in pending
EventFilter.error("It is not allowed to call absorbTermination() from onDownstreamFinish.").intercept {
downstream.cancel()
lastEvents() should be(Set(Cancel))
}
}
}