Small RS compatibility fix in cancelled sink (#24749)

* Small TCK fix in cancelled sink

* Fix comments
This commit is contained in:
James Roper 2018-03-19 17:08:36 +11:00 committed by Konrad `ktoso` Malawski
parent e98c77e976
commit f1a5679089
2 changed files with 31 additions and 1 deletions

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2014-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.tck
import akka.stream.scaladsl._
import org.reactivestreams.Subscriber
import org.testng.SkipException
class CancelledSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
override def createSubscriber(): Subscriber[Int] =
Flow[Int].to(Sink.cancelled).runWith(Source.asSubscriber)
override def createElement(element: Int): Int = element
override def required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() = {
throw new SkipException("Cancelled sink doesn't signal demand")
}
override def required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() = {
throw new SkipException("Cancelled sink doesn't signal demand")
}
override def required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() = {
throw new SkipException("Cancelled sink doesn't signal demand")
}
}

View file

@ -180,12 +180,15 @@ import scala.util.control.NonFatal
case Both(s)
set(Inert)
try tryOnError(s, ex)
finally if (t == null) throw ex // must throw NPE, rule 2:13
finally if (t == null) throw ex // must throw NPE, rule 2.13
case s: Subscriber[_] // spec violation
getAndSet(Inert) match {
case Inert // nothing to be done
case _ ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s)
}
case _ if t == null
// cancelled before onError(null), must throw NPE, rule 2.13
throw ex
case _ // spec violation or cancellation race, but nothing we can do
}