diff --git a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/CancelledSinkSubscriberTest.scala b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/CancelledSinkSubscriberTest.scala new file mode 100644 index 0000000000..f8c1bd9603 --- /dev/null +++ b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/CancelledSinkSubscriberTest.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2014-2018 Lightbend Inc. + */ + +package akka.stream.tck + +import akka.stream.scaladsl._ +import org.reactivestreams.Subscriber +import org.testng.SkipException + +class CancelledSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { + + override def createSubscriber(): Subscriber[Int] = + Flow[Int].to(Sink.cancelled).runWith(Source.asSubscriber) + + override def createElement(element: Int): Int = element + + override def required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() = { + throw new SkipException("Cancelled sink doesn't signal demand") + } + override def required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() = { + throw new SkipException("Cancelled sink doesn't signal demand") + } + override def required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() = { + throw new SkipException("Cancelled sink doesn't signal demand") + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 4bb2969033..a03f66903c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -180,12 +180,15 @@ import scala.util.control.NonFatal case Both(s) ⇒ set(Inert) try tryOnError(s, ex) - finally if (t == null) throw ex // must throw NPE, rule 2:13 + finally if (t == null) throw ex // must throw NPE, rule 2.13 case s: Subscriber[_] ⇒ // spec violation getAndSet(Inert) match { case Inert ⇒ // nothing to be done case _ ⇒ ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s) } + case _ if t == null ⇒ + // cancelled before onError(null), must throw NPE, rule 2.13 + throw ex case _ ⇒ // spec violation or cancellation race, but nothing we can do }