From 23c533fdd528a30c78764b58324a1769f0b28135 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 3 Mar 2015 10:57:25 +0100 Subject: [PATCH] =str #16751 Update to reactive-streams 1.0-RC3 Changed rules: * 1.9 Always onSubscribe prior to any other signals * 1.9 NullPointerException if subscriber is null * 3.17 Long overflow, effectively unbounded instead of onError Fixed some more things: * fixed some FIXME * Long drop and take * memory leaks in tck tests, use BeforeClass to create ActorSystem use AfterClass to shutdown ActorSystem * more tck tests * don't emit OnComplete when substream is cancelled * work around for memory leak in PrefixAndTail --- .../scala/akka/http/ClientServerSpec.scala | 4 +- .../akka/stream/tck/ActorSubscriberTest.scala | 3 +- .../stream/tck/ActorSystemLifecycle.scala | 44 ++++++++++ .../AkkaIdentityProcessorVerification.scala | 50 +++++------- .../tck/AkkaPublisherVerification.scala | 36 ++++----- .../tck/AkkaSubscriberVerification.scala | 49 +++-------- .../stream/tck/BlackholeSubscriberTest.scala | 5 +- .../scala/akka/stream/tck/ConcatTest.scala | 18 +++++ .../akka/stream/tck/EmptyPublisherTest.scala | 18 +++++ .../scala/akka/stream/tck/FlattenTest.scala | 21 +++++ .../stream/tck/FoldSinkSubscriberTest.scala | 15 ++++ .../tck/ForeachSinkSubscriberTest.scala | 15 ++++ .../stream/tck/FusableProcessorTest.scala | 14 +--- .../akka/stream/tck/FuturePublisherTest.scala | 2 +- .../scala/akka/stream/tck/GroupByTest.scala | 28 +++++++ .../stream/tck/HeadSinkSubscriberTest.scala | 3 +- .../stream/tck/IterablePublisherTest.scala | 19 ++--- .../akka/stream/tck/LazyEmptySourceTest.scala | 18 +++++ .../scala/akka/stream/tck/MapAsyncTest.scala | 10 +-- .../stream/tck/MapAsyncUnorderedTest.scala | 26 ++++++ .../test/scala/akka/stream/tck/MapTest.scala | 21 +++++ .../akka/stream/tck/PrefixAndTailTest.scala | 21 +++++ .../stream/tck/SingleElementSourceTest.scala | 18 +++++ .../scala/akka/stream/tck/SplitWhenTest.scala | 24 ++++++ .../tck/SyncIterablePublisherTest.scala | 7 +- .../test/scala/akka/stream/tck/Timeouts.scala | 3 +- .../stream/tck/TransformProcessorTest.scala | 10 +-- .../akka/stream/testkit/StreamTestKit.scala | 39 ++++----- .../akka/stream/testkit/TwoStreamsSetup.scala | 14 ++-- .../stream/actor/ActorPublisherSpec.scala | 9 ++- .../scala/akka/stream/io/StreamTcpSpec.scala | 6 +- .../stream/scaladsl/FlowFromFutureSpec.scala | 4 +- .../stream/scaladsl/FlowGroupBySpec.scala | 2 +- .../stream/scaladsl/FlowIteratorSpec.scala | 6 +- .../stream/scaladsl/FlowMapAsyncSpec.scala | 20 +++++ .../scaladsl/FlowMapAsyncUnorderedSpec.scala | 20 +++++ .../scaladsl/FlowPrefixAndTailSpec.scala | 6 +- .../scala/akka/stream/scaladsl/FlowSpec.scala | 5 +- .../stream/scaladsl/FlowSupervisionSpec.scala | 15 ++++ .../stream/scaladsl/GraphConcatSpec.scala | 8 +- .../stream/scaladsl/GraphFlexiMergeSpec.scala | 2 +- .../akka/stream/scaladsl/GraphZipSpec.scala | 16 ++-- .../stream/scaladsl/GraphZipWithSpec.scala | 16 ++-- .../akka/stream/scaladsl/SourceSpec.scala | 10 +-- .../akka/stream/scaladsl/TickSourceSpec.scala | 2 +- .../akka/stream/actor/ActorPublisher.scala | 24 ++++-- .../akka/stream/actor/ActorSubscriber.scala | 16 +++- .../akka/stream/impl/ActorProcessor.scala | 46 +++++++---- .../akka/stream/impl/ActorPublisher.scala | 15 ++-- .../stream/impl/BlackholeSubscriber.scala | 7 +- .../stream/impl/CompletedPublishers.scala | 33 +++++++- .../main/scala/akka/stream/impl/FanIn.scala | 20 +++-- .../main/scala/akka/stream/impl/FanOut.scala | 17 ++-- .../akka/stream/impl/FanoutProcessor.scala | 10 ++- .../akka/stream/impl/FuturePublisher.scala | 2 +- .../stream/impl/MapAsyncProcessorImpl.scala | 5 +- .../impl/MapAsyncUnorderedProcessorImpl.scala | 5 +- .../impl/ReactiveStreamsCompliance.scala | 53 +++++++++--- .../main/scala/akka/stream/impl/Sinks.scala | 17 +++- .../main/scala/akka/stream/impl/Sources.scala | 13 ++- .../stream/impl/SplitWhenProcessorImpl.scala | 5 ++ .../main/scala/akka/stream/impl/Stages.scala | 8 +- .../impl/StreamOfStreamProcessors.scala | 81 ++++++++++++++----- .../impl/StreamSubscriptionTimeout.scala | 15 +++- .../stream/impl/SubscriberManagement.scala | 64 +++++++-------- .../impl/SynchronousIterablePublisher.scala | 12 +-- .../akka/stream/impl/TickPublisher.scala | 7 +- .../scala/akka/stream/impl/Transfer.scala | 3 +- .../stream/impl/fusing/ActorInterpreter.scala | 10 +-- .../akka/stream/impl/fusing/Interpreter.scala | 7 ++ .../scala/akka/stream/impl/fusing/Ops.scala | 8 +- .../stream/impl/io/DelayedInitProcessor.scala | 24 ++++-- .../stream/impl/io/TcpConnectionStream.scala | 12 ++- .../stream/impl/io/TcpListenStreamActor.scala | 8 +- .../main/scala/akka/stream/javadsl/Flow.scala | 4 +- .../scala/akka/stream/javadsl/Source.scala | 4 +- .../scala/akka/stream/scaladsl/Flow.scala | 6 +- .../stream/scaladsl/OperationAttributes.scala | 11 --- .../akka/stream/scaladsl/StreamTcp.scala | 2 + 79 files changed, 862 insertions(+), 414 deletions(-) create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/ActorSystemLifecycle.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/EmptyPublisherTest.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/FoldSinkSubscriberTest.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/ForeachSinkSubscriberTest.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/LazyEmptySourceTest.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/MapAsyncUnorderedTest.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index 910215bd4b..f3291920df 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -56,11 +56,11 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val probe2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() an[BindFailedException] shouldBe thrownBy { Await.result(binding.to(Sink(probe2)).run(), 3.seconds) } - probe2.expectErrorOrSubscriptionFollowedByError() + probe2.expectSubscriptionAndError() val probe3 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() an[BindFailedException] shouldBe thrownBy { Await.result(binding.to(Sink(probe3)).run(), 3.seconds) } - probe3.expectErrorOrSubscriptionFollowedByError() + probe3.expectSubscriptionAndError() // Now unbind the first Await.result(b1.unbind(), 1.second) diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/ActorSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/ActorSubscriberTest.scala index 899e0bbe2d..4e7f1d351d 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/ActorSubscriberTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/ActorSubscriberTest.scala @@ -24,6 +24,5 @@ class ActorSubscriberOneByOneRequestTest extends AkkaSubscriberBlackboxVerificat ActorSubscriber(system.actorOf(props.withDispatcher("akka.test.stream-dispatcher"))) } - override def createHelperPublisher(elements: Long) = - createSimpleIntPublisher(elements) + override def createElement(element: Int): Int = element } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/ActorSystemLifecycle.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/ActorSystemLifecycle.scala new file mode 100644 index 0000000000..90984a797a --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/ActorSystemLifecycle.scala @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.tck + +import java.util.concurrent.TimeoutException +import scala.concurrent.duration._ +import akka.actor.ActorSystem +import akka.actor.ActorSystemImpl +import org.testng.annotations.AfterClass +import akka.stream.testkit.AkkaSpec +import akka.event.Logging +import akka.testkit.TestEvent +import akka.testkit.EventFilter +import org.testng.annotations.BeforeClass + +trait ActorSystemLifecycle { + + private var _system: ActorSystem = _ + + final def system: ActorSystem = _system + + def shutdownTimeout: FiniteDuration = 10.seconds + + @BeforeClass + def createActorSystem(): Unit = { + _system = ActorSystem(Logging.simpleName(getClass), AkkaSpec.testConf) + _system.eventStream.publish(TestEvent.Mute(EventFilter[RuntimeException]("Test exception"))) + } + + @AfterClass + def shutdownActorSystem(): Unit = { + try { + system.shutdown() + system.awaitTermination(shutdownTimeout) + } catch { + case _: TimeoutException ⇒ + val msg = "Failed to stop [%s] within [%s] \n%s".format(system.name, shutdownTimeout, + system.asInstanceOf[ActorSystemImpl].printTree) + throw new RuntimeException(msg) + } + } + +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala index 97ecf6a23e..d14b85c342 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala @@ -3,51 +3,32 @@ */ package akka.stream.tck -import akka.event.Logging - -import scala.collection.{ mutable, immutable } -import akka.actor.ActorSystem +import java.util.concurrent.Executors +import java.util.concurrent.ExecutorService +import java.util.concurrent.TimeUnit +import scala.concurrent.duration._ import akka.stream.ActorFlowMaterializer import akka.stream.scaladsl.{ Flow, Sink, Source } -import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit -import akka.testkit.EventFilter -import akka.testkit.TestEvent import org.reactivestreams.{ Subscriber, Subscription, Processor, Publisher } import org.reactivestreams.tck.IdentityProcessorVerification import org.reactivestreams.tck.TestEnvironment import org.scalatest.testng.TestNGSuiteLike +import org.testng.annotations.AfterClass -abstract class AkkaIdentityProcessorVerification[T](val system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long) +abstract class AkkaIdentityProcessorVerification[T](env: TestEnvironment, publisherShutdownTimeout: Long) extends IdentityProcessorVerification[T](env, publisherShutdownTimeout) - with TestNGSuiteLike { + with TestNGSuiteLike with ActorSystemLifecycle { - system.eventStream.publish(TestEvent.Mute(EventFilter[RuntimeException]("Test exception"))) + def this(printlnDebug: Boolean) = + this(new TestEnvironment(Timeouts.defaultTimeoutMillis, printlnDebug), Timeouts.publisherShutdownTimeoutMillis) - def this(system: ActorSystem, printlnDebug: Boolean) { - this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug), Timeouts.publisherShutdownTimeoutMillis) - } - - def this(printlnDebug: Boolean) { - this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug) - } - - def this() { - this(false) - } + def this() = this(false) override def createErrorStatePublisher(): Publisher[T] = StreamTestKit.errorPublisher(new Exception("Unable to serve subscribers right now!")) - def createSimpleIntPublisher(elements: Long)(implicit mat: ActorFlowMaterializer): Publisher[Int] = { - val iterable: immutable.Iterable[Int] = - if (elements == Long.MaxValue) 1 to Int.MaxValue - else 0 until elements.toInt - - Source(iterable).runWith(Sink.publisher()) - } - - def processorFromFlow[T](flow: Flow[T, T, _])(implicit mat: ActorFlowMaterializer): Processor[T, T] = { + def processorFromFlow(flow: Flow[T, T, _])(implicit mat: ActorFlowMaterializer): Processor[T, T] = { val (sub: Subscriber[T], pub: Publisher[T]) = flow.runWith(Source.subscriber[T](), Sink.publisher[T]()) new Processor[T, T] { @@ -61,4 +42,13 @@ abstract class AkkaIdentityProcessorVerification[T](val system: ActorSystem, env /** By default Akka Publishers do not support Fanout! */ override def maxSupportedSubscribers: Long = 1L + + override lazy val publisherExecutorService: ExecutorService = + Executors.newFixedThreadPool(3) + + @AfterClass + def shutdownPublisherExecutorService(): Unit = { + publisherExecutorService.shutdown() + publisherExecutorService.awaitTermination(3, TimeUnit.SECONDS) + } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala index fa19c428aa..bca718232c 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala @@ -3,10 +3,9 @@ */ package akka.stream.tck +import scala.collection.immutable import akka.event.Logging - import scala.concurrent.duration._ - import akka.actor.ActorSystem import akka.stream.ActorFlowMaterializerSettings import akka.stream.ActorFlowMaterializer @@ -16,31 +15,26 @@ import org.reactivestreams.Publisher import org.reactivestreams.tck.{ PublisherVerification, TestEnvironment } import org.scalatest.testng.TestNGSuiteLike import org.testng.annotations.AfterClass +import akka.actor.ActorSystemImpl +import java.util.concurrent.TimeoutException -abstract class AkkaPublisherVerification[T](val system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long) +abstract class AkkaPublisherVerification[T](val env: TestEnvironment, publisherShutdownTimeout: Long) extends PublisherVerification[T](env, publisherShutdownTimeout) - with TestNGSuiteLike { + with TestNGSuiteLike with ActorSystemLifecycle { - def this(system: ActorSystem, printlnDebug: Boolean) { - this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug), Timeouts.publisherShutdownTimeoutMillis) - } + def this(printlnDebug: Boolean) = + this(new TestEnvironment(Timeouts.defaultTimeoutMillis, printlnDebug), Timeouts.publisherShutdownTimeoutMillis) - def this(printlnDebug: Boolean) { - this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug) - } + def this() = this(false) - def this() { - this(false) - } - - implicit val materializer = ActorFlowMaterializer(ActorFlowMaterializerSettings(system).copy(maxInputBufferSize = 512))(system) - - @AfterClass - def shutdownActorSystem(): Unit = { - system.shutdown() - system.awaitTermination(10.seconds) - } + implicit lazy val materializer = ActorFlowMaterializer(ActorFlowMaterializerSettings(system).copy(maxInputBufferSize = 512))(system) override def createErrorStatePublisher(): Publisher[T] = StreamTestKit.errorPublisher(new Exception("Unable to serve subscribers right now!")) + + def iterable(elements: Long): immutable.Iterable[Int] = + if (elements > Int.MaxValue) + new immutable.Iterable[Int] { override def iterator = Iterator from 0 } + else + 0 until elements.toInt } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala index e793ecdf36..551acc279e 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala @@ -20,57 +20,28 @@ import org.reactivestreams.tck.TestEnvironment import org.scalatest.testng.TestNGSuiteLike import org.testng.annotations.AfterClass -abstract class AkkaSubscriberBlackboxVerification[T](val system: ActorSystem, env: TestEnvironment) +abstract class AkkaSubscriberBlackboxVerification[T](env: TestEnvironment) extends SubscriberBlackboxVerification[T](env) with TestNGSuiteLike - with AkkaSubscriberVerificationLike { + with AkkaSubscriberVerificationLike with ActorSystemLifecycle { - def this(system: ActorSystem, printlnDebug: Boolean) { - this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug)) - } + def this(printlnDebug: Boolean) = + this(new TestEnvironment(Timeouts.defaultTimeoutMillis, printlnDebug)) - def this(printlnDebug: Boolean) { - this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug) - } - - def this() { - this(false) - } + def this() = this(false) } -abstract class AkkaSubscriberWhiteboxVerification[T](val system: ActorSystem, env: TestEnvironment) +abstract class AkkaSubscriberWhiteboxVerification[T](env: TestEnvironment) extends SubscriberWhiteboxVerification[T](env) with TestNGSuiteLike with AkkaSubscriberVerificationLike { - def this(system: ActorSystem, printlnDebug: Boolean) { - this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug)) - } + def this(printlnDebug: Boolean) = + this(new TestEnvironment(Timeouts.defaultTimeoutMillis, printlnDebug)) - def this(printlnDebug: Boolean) { - this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug) - } - - def this() { - this(false) - } + def this() = this(false) } trait AkkaSubscriberVerificationLike { implicit def system: ActorSystem - implicit val materializer = ActorFlowMaterializer(ActorFlowMaterializerSettings(system)) - - def createSimpleIntPublisher(elements: Long): Publisher[Int] = { - val iterable: immutable.Iterable[Int] = - if (elements == Long.MaxValue) 1 to Int.MaxValue - else 0 until elements.toInt - - Source(iterable).runWith(Sink.publisher()) - } - - @AfterClass - def shutdownActorSystem(): Unit = { - system.shutdown() - system.awaitTermination(10.seconds) - } - + implicit lazy val materializer = ActorFlowMaterializer(ActorFlowMaterializerSettings(system)) } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala index 91eee0810f..bca19a9c80 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala @@ -9,9 +9,8 @@ import org.reactivestreams.Subscriber class BlackholeSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { - override def createSubscriber(): Subscriber[Int] = - new BlackholeSubscriber[Int](2) + override def createSubscriber(): Subscriber[Int] = new BlackholeSubscriber[Int](2) - override def createHelperPublisher(elements: Long): Publisher[Int] = createSimpleIntPublisher(elements) + override def createElement(element: Int): Int = element } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala new file mode 100644 index 0000000000..d828c1d741 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/ConcatTest.scala @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import org.reactivestreams.Publisher + +class ConcatTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = { + Source(iterable(elements / 2)).concat(Source(iterable((elements + 1) / 2))).runWith(Sink.publisher()) + } + + // FIXME verifyNoAsyncErrors() without delay is wrong in TCK, enable again in RC4 + override def optional_spec111_maySupportMultiSubscribe(): Unit = () +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/EmptyPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/EmptyPublisherTest.scala new file mode 100644 index 0000000000..bdacc09bbc --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/EmptyPublisherTest.scala @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.Sink +import akka.stream.impl.EmptyPublisher + +class EmptyPublisherTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = EmptyPublisher[Int] + + override def maxElementsFromPublisher(): Long = 0 +} + diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala new file mode 100644 index 0000000000..2f21743546 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import org.reactivestreams.Publisher +import akka.stream.FlattenStrategy + +class FlattenTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = { + val s1 = Source(iterable(elements / 2)) + val s2 = Source(iterable((elements + 1) / 2)) + Source(List(s1, s2)).flatten(FlattenStrategy.concat).runWith(Sink.publisher()) + } + + // FIXME verifyNoAsyncErrors() without delay is wrong in TCK, enable again in RC4 + override def optional_spec111_maySupportMultiSubscribe(): Unit = () +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FoldSinkSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FoldSinkSubscriberTest.scala new file mode 100644 index 0000000000..3edba0b6b3 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FoldSinkSubscriberTest.scala @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.scaladsl._ +import org.reactivestreams.Subscriber + +class FoldSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { + + override def createSubscriber(): Subscriber[Int] = + Flow[Int].to(Sink.fold(0)(_ + _)).runWith(Source.subscriber()) + + override def createElement(element: Int): Int = element +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/ForeachSinkSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/ForeachSinkSubscriberTest.scala new file mode 100644 index 0000000000..5e211076be --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/ForeachSinkSubscriberTest.scala @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.scaladsl._ +import org.reactivestreams.Subscriber + +class ForeachSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { + + override def createSubscriber(): Subscriber[Int] = + Flow[Int].to(Sink.foreach { _ ⇒ }).runWith(Source.subscriber()) + + override def createElement(element: Int): Int = element +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala index 4ffab04cde..eb48886a60 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala @@ -3,17 +3,13 @@ */ package akka.stream.tck -import java.util.concurrent.atomic.AtomicInteger - +import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings } import akka.stream.impl.Stages.Identity import akka.stream.scaladsl.{ OperationAttributes, Flow } -import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings } -import org.reactivestreams.{ Processor, Publisher } +import org.reactivestreams.Processor class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] { - val processorCounter = new AtomicInteger - override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = { val settings = ActorFlowMaterializerSettings(system) .withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize) @@ -25,10 +21,6 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] { Flow[Int].andThen(Identity()).withAttributes(OperationAttributes.name("identity"))) } - override def createHelperPublisher(elements: Long): Publisher[Int] = { - implicit val mat = ActorFlowMaterializer()(system) - - createSimpleIntPublisher(elements)(mat) - } + override def createElement(element: Int): Int = element } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala index 198a099ae3..db829244af 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala @@ -19,4 +19,4 @@ class FuturePublisherTest extends AkkaPublisherVerification[Int] { } override def maxElementsFromPublisher(): Long = 1 -} \ No newline at end of file +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala new file mode 100644 index 0000000000..afdb231c14 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/GroupByTest.scala @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.tck + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.stream.impl.EmptyPublisher +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import org.reactivestreams.Publisher + +class GroupByTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = + if (elements == 0) EmptyPublisher[Int] + else { + val futureGroupSource = + Source(iterable(elements)).groupBy(elem ⇒ "all").map { case (_, group) ⇒ group }.runWith(Sink.head()) + val groupSource = Await.result(futureGroupSource, 3.seconds) + groupSource.runWith(Sink.publisher()) + + } + + // FIXME verifyNoAsyncErrors() without delay is wrong in TCK, enable again in RC4 + override def optional_spec111_maySupportMultiSubscribe(): Unit = () +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala index a2c380e132..8e83821c28 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala @@ -15,6 +15,5 @@ class HeadSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { override def createSubscriber(): Subscriber[Int] = new HeadSinkSubscriber[Int](Promise[Int]()) - override def createHelperPublisher(elements: Long) = - createSimpleIntPublisher(elements) + override def createElement(element: Int): Int = element } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala index a96e250dae..6cab762130 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala @@ -10,18 +10,11 @@ import org.reactivestreams._ class IterablePublisherTest extends AkkaPublisherVerification[Int] { - def createPublisher(elements: Long): Publisher[Int] = { - val iterable: immutable.Iterable[Int] = - if (elements == Long.MaxValue) - new immutable.Iterable[Int] { override def iterator = Iterator from 0 } - else - 0 until elements.toInt - - Source(iterable).runWith(Sink.publisher()) + override def createPublisher(elements: Long): Publisher[Int] = { + Source(iterable(elements)).runWith(Sink.publisher()) } - override def spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue(): Unit = { - // FIXME: This test needs RC3 - notVerified() - } -} \ No newline at end of file + // FIXME #16983 + override def required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue(): Unit = () + +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/LazyEmptySourceTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/LazyEmptySourceTest.scala new file mode 100644 index 0000000000..25fd2dfcbb --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/LazyEmptySourceTest.scala @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.Sink + +class LazyEmptySourceTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = + Source.lazyEmpty[Int].runWith(Sink.publisher()) + + override def maxElementsFromPublisher(): Long = 0 +} + diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/MapAsyncTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/MapAsyncTest.scala index 2b511b2f84..b15ee1eca8 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/MapAsyncTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/MapAsyncTest.scala @@ -3,8 +3,6 @@ */ package akka.stream.tck -import java.util.concurrent.atomic.AtomicInteger - import akka.stream.scaladsl.{ Flow, OperationAttributes } import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings } import org.reactivestreams.{ Processor, Publisher } @@ -13,8 +11,6 @@ import scala.concurrent.Future class MapAsyncTest extends AkkaIdentityProcessorVerification[Int] { - val processorCounter = new AtomicInteger - override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = { val settings = ActorFlowMaterializerSettings(system) .withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize) @@ -25,10 +21,6 @@ class MapAsyncTest extends AkkaIdentityProcessorVerification[Int] { Flow[Int].mapAsync(Future.successful).withAttributes(OperationAttributes.name("identity"))) } - override def createHelperPublisher(elements: Long): Publisher[Int] = { - implicit val mat = ActorFlowMaterializer()(system) - - createSimpleIntPublisher(elements)(mat) - } + override def createElement(element: Int): Int = element } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/MapAsyncUnorderedTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/MapAsyncUnorderedTest.scala new file mode 100644 index 0000000000..aab1b63dd1 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/MapAsyncUnorderedTest.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.scaladsl.{ Flow, OperationAttributes } +import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings } +import org.reactivestreams.{ Processor, Publisher } + +import scala.concurrent.Future + +class MapAsyncUnorderedTest extends AkkaIdentityProcessorVerification[Int] { + + override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = { + val settings = ActorFlowMaterializerSettings(system) + .withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize) + + implicit val materializer = ActorFlowMaterializer(settings)(system) + + processorFromFlow( + Flow[Int].mapAsyncUnordered(Future.successful).withAttributes(OperationAttributes.name("identity"))) + } + + override def createElement(element: Int): Int = element + +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala new file mode 100644 index 0000000000..1a29043097 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.ActorFlowMaterializer +import akka.stream.scaladsl.{ Flow, OperationAttributes } +import org.reactivestreams.Processor + +class MapTest extends AkkaIdentityProcessorVerification[Int] { + + override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = { + implicit val materializer = ActorFlowMaterializer()(system) + + processorFromFlow( + Flow[Int].map(elem ⇒ elem).withAttributes(OperationAttributes.name("identity"))) + } + + override def createElement(element: Int): Int = element + +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala new file mode 100644 index 0000000000..e4c2c540e3 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/PrefixAndTailTest.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.tck + +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import org.reactivestreams.Publisher + +class PrefixAndTailTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = { + val futureTailSource = Source(iterable(elements)).prefixAndTail(0).map { case (_, tail) ⇒ tail }.runWith(Sink.head()) + val tailSource = Await.result(futureTailSource, 3.seconds) + tailSource.runWith(Sink.publisher()) + } + +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala new file mode 100644 index 0000000000..1795732779 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementSourceTest.scala @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +import org.reactivestreams.Publisher + +class SingleElementSourceTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = + Source.single(1).runWith(Sink.publisher()) + + override def maxElementsFromPublisher(): Long = 1 +} + diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala new file mode 100644 index 0000000000..c6cb077aec --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SplitWhenTest.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.tck + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.stream.impl.EmptyPublisher +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import org.reactivestreams.Publisher + +class SplitWhenTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = + if (elements == 0) EmptyPublisher[Int] + else { + val futureSource = Source(iterable(elements)).splitWhen(elem ⇒ false).runWith(Sink.head()) + val source = Await.result(futureSource, 3.seconds) + source.runWith(Sink.publisher()) + } + +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala index e78827b545..df52b6d07b 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala @@ -14,13 +14,12 @@ class SyncIterablePublisherTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { val iterable: immutable.Iterable[Int] = - if (elements == Long.MaxValue) - new immutable.Iterable[Int] { override def iterator = Iterator from 0 } + if (elements >= 10000) + 0 until 10000 // this publisher is not intended to be used for large collections else 0 until elements.toInt Source(SynchronousIterablePublisher(iterable, "synchronous-iterable-publisher")).runWith(Sink.publisher()) } - override def spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() = notVerified("RS TCK 1.0.0.M3 does not handle sync publishers well") -} \ No newline at end of file +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/Timeouts.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/Timeouts.scala index f75f97f042..33be0fbf05 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/Timeouts.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/Timeouts.scala @@ -14,7 +14,6 @@ object Timeouts { def publisherShutdownTimeoutMillis: Int = 1000 - def defaultTimeoutMillis(implicit system: ActorSystem): Int = - 500.millis.dilated(system).toMillis.toInt + def defaultTimeoutMillis: Int = 500 } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala index f32fc47fcf..fa5a8d08f4 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala @@ -3,8 +3,6 @@ */ package akka.stream.tck -import java.util.concurrent.atomic.AtomicInteger - import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings } import akka.stream.impl.ActorFlowMaterializerImpl import akka.stream.impl.Stages.Identity @@ -15,8 +13,6 @@ import org.reactivestreams.{ Processor, Publisher } class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] { - val processorCounter = new AtomicInteger - override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = { val settings = ActorFlowMaterializerSettings(system) .withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize) @@ -31,10 +27,6 @@ class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] { processorFromFlow(Flow[Int].transform(mkStage)) } - override def createHelperPublisher(elements: Long): Publisher[Int] = { - implicit val mat = ActorFlowMaterializer()(system) - - createSimpleIntPublisher(elements)(mat) - } + override def createElement(element: Int): Int = element } diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala index d0324d6f7b..aec520baad 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -113,6 +113,23 @@ object StreamTestKit { def expectError(cause: Throwable): Unit = probe.expectMsg(OnError(cause)) def expectError(): Throwable = probe.expectMsgType[OnError].cause + def expectSubscriptionAndError(cause: Throwable): Unit = { + val sub = expectSubscription() + sub.request(1) + expectError(cause) + } + def expectSubscriptionAndError(): Throwable = { + val sub = expectSubscription() + sub.request(1) + expectError() + } + + def expectSubscriptionAndComplete(): Unit = { + val sub = expectSubscription() + sub.request(1) + expectComplete() + } + def expectNextOrError(element: I, cause: Throwable): Either[Throwable, I] = { probe.fishForMessage(hint = s"OnNext($element) or ${cause.getClass.getName}") { case OnNext(n) ⇒ true @@ -123,28 +140,6 @@ object StreamTestKit { } } - def expectErrorOrSubscriptionFollowedByError(cause: Throwable): Unit = { - val t = expectErrorOrSubscriptionFollowedByError() - assert(t == cause, s"expected $cause, found $cause") - } - - def expectErrorOrSubscriptionFollowedByError(): Throwable = - probe.expectMsgPF() { - case s: OnSubscribe ⇒ - s.subscription.request(1) - expectError() - case OnError(cause) ⇒ cause - } - - def expectCompletedOrSubscriptionFollowedByComplete(): Unit = { - probe.expectMsgPF() { - case s: OnSubscribe ⇒ - s.subscription.request(1) - expectComplete() - case OnComplete ⇒ - } - } - def expectNoMsg(): Unit = probe.expectNoMsg() def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala index f5e5acba46..0b08fb137e 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala @@ -53,39 +53,39 @@ abstract class TwoStreamsSetup extends AkkaSpec { def commonTests() = { "work with two immediately completed publishers" in { val subscriber = setup(completedPublisher, completedPublisher) - subscriber.expectCompletedOrSubscriptionFollowedByComplete() + subscriber.expectSubscriptionAndComplete() } "work with two delayed completed publishers" in { val subscriber = setup(soonToCompletePublisher, soonToCompletePublisher) - subscriber.expectCompletedOrSubscriptionFollowedByComplete() + subscriber.expectSubscriptionAndComplete() } "work with one immediately completed and one delayed completed publisher" in { val subscriber = setup(completedPublisher, soonToCompletePublisher) - subscriber.expectCompletedOrSubscriptionFollowedByComplete() + subscriber.expectSubscriptionAndComplete() } "work with two immediately failed publishers" in { val subscriber = setup(failedPublisher, failedPublisher) - subscriber.expectErrorOrSubscriptionFollowedByError(TestException) + subscriber.expectSubscriptionAndError(TestException) } "work with two delayed failed publishers" in { val subscriber = setup(soonToFailPublisher, soonToFailPublisher) - subscriber.expectErrorOrSubscriptionFollowedByError(TestException) + subscriber.expectSubscriptionAndError(TestException) } // Warning: The two test cases below are somewhat implementation specific and might fail if the implementation // is changed. They are here to be an early warning though. "work with one immediately failed and one delayed failed publisher (case 1)" in { val subscriber = setup(soonToFailPublisher, failedPublisher) - subscriber.expectErrorOrSubscriptionFollowedByError(TestException) + subscriber.expectSubscriptionAndError(TestException) } "work with one immediately failed and one delayed failed publisher (case 2)" in { val subscriber = setup(failedPublisher, soonToFailPublisher) - subscriber.expectErrorOrSubscriptionFollowedByError(TestException) + subscriber.expectSubscriptionAndError(TestException) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index 26977196a7..7548d514ce 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -169,7 +169,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { ref ! Err("early err") val s = StreamTestKit.SubscriberProbe[String]() ActorPublisher[String](ref).subscribe(s) - s.expectError.getMessage should be("early err") + s.expectSubscriptionAndError.getMessage should be("early err") } "drop onNext elements after cancel" in { @@ -225,7 +225,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { ref ! Complete val s = StreamTestKit.SubscriberProbe[String]() ActorPublisher[String](ref).subscribe(s) - s.expectComplete + s.expectSubscriptionAndComplete } "only allow one subscriber" in { @@ -236,7 +236,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { s.expectSubscription val s2 = StreamTestKit.SubscriberProbe[String]() ActorPublisher[String](ref).subscribe(s2) - s2.expectError.getClass should be(classOf[IllegalStateException]) + s2.expectSubscriptionAndError.getClass should be(classOf[IllegalStateException]) } "signal onCompete when actor is stopped" in { @@ -325,7 +325,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { // now subscribers will already be rejected, while the actor could perform some clean-up val sub = StreamTestKit.SubscriberProbe() pub.subscribe(sub) - sub.expectError() + sub.expectSubscriptionAndError() expectMsg("cleaned-up") // termination is tiggered by user code @@ -343,6 +343,7 @@ class ActorPublisherSpec extends AkkaSpec with ImplicitSender { // subscribe right away, should cancel subscription-timeout pub.subscribe(sub) + sub.expectSubscription() expectNoMsg() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala index db46c6f1a3..9a3eaa08b8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala @@ -294,7 +294,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val serverConnection = server.waitAccept() serverConnection.abort() - tcpReadProbe.subscriberProbe.expectErrorOrSubscriptionFollowedByError() + tcpReadProbe.subscriberProbe.expectSubscriptionAndError() tcpWriteProbe.tcpWriteSubscription.expectCancellation() serverConnection.expectTerminated() @@ -408,11 +408,11 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val probe2 = StreamTestKit.SubscriberProbe[StreamTcp.IncomingConnection]() val binding2F = bind.to(Sink(probe2)).run() - probe2.expectErrorOrSubscriptionFollowedByError(BindFailedException) + probe2.expectSubscriptionAndError(BindFailedException) val probe3 = StreamTestKit.SubscriberProbe[StreamTcp.IncomingConnection]() val binding3F = bind.to(Sink(probe3)).run() - probe3.expectErrorOrSubscriptionFollowedByError() + probe3.expectSubscriptionAndError() an[BindFailedException] shouldBe thrownBy { Await.result(binding2F, 1.second) } an[BindFailedException] shouldBe thrownBy { Await.result(binding3F, 1.second) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala index 8a2761027f..920f6bda83 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala @@ -35,7 +35,7 @@ class FlowFromFutureSpec extends AkkaSpec { val p = Source(Future.failed[Int](ex)).runWith(Sink.publisher()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) - c.expectError(ex) + c.expectSubscriptionAndError(ex) } "produce one element when Future is completed" in { @@ -119,4 +119,4 @@ class FlowFromFutureSpec extends AkkaSpec { c.expectNoMsg(200.millis) } } -} \ No newline at end of file +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 1a8fad4e61..3542d66439 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -142,7 +142,7 @@ class FlowGroupBySpec extends AkkaSpec { val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, _])]() publisher.subscribe(subscriber) - subscriber.expectCompletedOrSubscriptionFollowedByComplete() + subscriber.expectSubscriptionAndComplete() } "abort on onError from upstream" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala index 1607ae8d86..cefb911598 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala @@ -56,7 +56,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { val p = createSource(immutable.Iterable.empty[Int]).runWith(Sink.publisher()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) - c.expectCompletedOrSubscriptionFollowedByComplete() + c.expectSubscriptionAndComplete() c.expectNoMsg(100.millis) } @@ -167,7 +167,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { val p = createSource(iterable).runWith(Sink.publisher()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) - c.expectErrorOrSubscriptionFollowedByError().getMessage should be("no good iterator") + c.expectSubscriptionAndError().getMessage should be("no good iterator") c.expectNoMsg(100.millis) } @@ -181,7 +181,7 @@ abstract class AbstractFlowIteratorSpec extends AkkaSpec { val p = createSource(iterable).runWith(Sink.publisher()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) - c.expectErrorOrSubscriptionFollowedByError().getMessage should be("no next") + c.expectSubscriptionAndError().getMessage should be("no next") c.expectNoMsg(100.millis) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index 3bddaad8f1..827778e56a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -16,6 +16,7 @@ import akka.testkit.TestLatch import akka.testkit.TestProbe import akka.stream.scaladsl.OperationAttributes.supervisionStrategy import akka.stream.Supervision.resumingDecider +import akka.stream.impl.ReactiveStreamsCompliance class FlowMapAsyncSpec extends AkkaSpec { @@ -137,6 +138,25 @@ class FlowMapAsyncSpec extends AkkaSpec { c.expectComplete() } + "signal NPE when future is completed with null" in { + val c = StreamTestKit.SubscriberProbe[String]() + val p = Source(List("a", "b")).mapAsync(elem ⇒ Future.successful(null)).to(Sink(c)).run() + val sub = c.expectSubscription() + sub.request(10) + c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) + } + + "resume when future is completed with null" in { + val c = StreamTestKit.SubscriberProbe[String]() + val p = Source(List("a", "b", "c")).section(supervisionStrategy(resumingDecider))( + _.mapAsync(elem ⇒ if (elem == "b") Future.successful(null) else Future.successful(elem))) + .to(Sink(c)).run() + val sub = c.expectSubscription() + sub.request(10) + for (elem ← List("a", "c")) c.expectNext(elem) + c.expectComplete() + } + "should handle cancel properly" in { val pub = StreamTestKit.PublisherProbe[Int]() val sub = StreamTestKit.SubscriberProbe[Int]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index d8bf77afdb..ef374640b4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -17,6 +17,7 @@ import akka.stream.scaladsl.OperationAttributes.supervisionStrategy import akka.stream.Supervision.resumingDecider import akka.stream.testkit.StreamTestKit.OnNext import akka.stream.testkit.StreamTestKit.OnComplete +import akka.stream.impl.ReactiveStreamsCompliance class FlowMapAsyncUnorderedSpec extends AkkaSpec { @@ -133,6 +134,25 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { c.probe.receiveN(5).toSet should be(expected) } + "signal NPE when future is completed with null" in { + val c = StreamTestKit.SubscriberProbe[String]() + val p = Source(List("a", "b")).mapAsyncUnordered(elem ⇒ Future.successful(null)).to(Sink(c)).run() + val sub = c.expectSubscription() + sub.request(10) + c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) + } + + "resume when future is completed with null" in { + val c = StreamTestKit.SubscriberProbe[String]() + val p = Source(List("a", "b", "c")).section(supervisionStrategy(resumingDecider))( + _.mapAsyncUnordered(elem ⇒ if (elem == "b") Future.successful(null) else Future.successful(elem))) + .to(Sink(c)).run() + val sub = c.expectSubscription() + sub.request(10) + for (elem ← List("a", "c")) c.expectNext(elem) + c.expectComplete() + } + "should handle cancel properly" in { val pub = StreamTestKit.PublisherProbe[Int]() val sub = StreamTestKit.SubscriberProbe[Int]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index 6f36601d01..cb46c3f2be 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -33,7 +33,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { prefix should be(Nil) val tailSubscriber = SubscriberProbe[Int] tailFlow.to(Sink(tailSubscriber)).run() - tailSubscriber.expectComplete() + tailSubscriber.expectSubscriptionAndComplete() } "work on short input" in { @@ -43,7 +43,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { prefix should be(List(1, 2, 3)) val tailSubscriber = SubscriberProbe[Int] tailFlow.to(Sink(tailSubscriber)).run() - tailSubscriber.expectComplete() + tailSubscriber.expectSubscriptionAndComplete() } "work on longer inputs" in { @@ -87,7 +87,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { val subscriber = StreamTestKit.SubscriberProbe[Int]() tail.to(Sink(subscriber)).run() - subscriber.expectCompletedOrSubscriptionFollowedByComplete() + subscriber.expectSubscriptionAndComplete() } "handle onError when no substream open" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 9a574474ba..8efb707900 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -510,7 +510,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val downstream2 = StreamTestKit.SubscriberProbe[String]() publisher.subscribe(downstream2) - downstream2.expectError() should be(TestException) + downstream2.expectSubscriptionAndError() should be(TestException) } } @@ -524,7 +524,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val downstream2 = StreamTestKit.SubscriberProbe[Any]() publisher.subscribe(downstream2) // IllegalStateException shut down - downstream2.expectError().isInstanceOf[IllegalStateException] should be(true) + downstream2.expectSubscriptionAndError().isInstanceOf[IllegalStateException] should be(true) } } } @@ -572,6 +572,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val downstream3 = StreamTestKit.SubscriberProbe[Any]() publisher.subscribe(downstream3) + downstream3.expectSubscription() // IllegalStateException terminated abruptly checkError(downstream3) } finally { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala index b8af388255..ab7d8d86ac 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala @@ -12,6 +12,7 @@ import scala.util.control.NoStackTrace import scala.concurrent.Await import akka.stream.testkit.StreamTestKit.SubscriberProbe import akka.stream.Supervision +import akka.stream.impl.ReactiveStreamsCompliance class FlowSupervisionSpec extends AkkaSpec { import OperationAttributes.supervisionStrategy @@ -38,5 +39,19 @@ class FlowSupervisionSpec extends AkkaSpec { result should be(List(1, 2, 4, 5)) } + "complete stream with NPE failure when null is emitted" in { + intercept[NullPointerException] { + Await.result(Source(List("a", "b")).map(_ ⇒ null).grouped(1000).runWith(Sink.head()), 3.seconds) + }.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) + } + + "resume stream when null is emitted" in { + val nullMap = Flow[String].map(elem ⇒ if (elem == "b") null else elem) + .withAttributes(supervisionStrategy(Supervision.resumingDecider)) + val result = Await.result(Source(List("a", "b", "c")).via(nullMap) + .grouped(1000).runWith(Sink.head()), 3.seconds) + result should be(List("a", "c")) + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala index b5d1919a6b..cca1d7c696 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala @@ -97,10 +97,10 @@ class GraphConcatSpec extends TwoStreamsSetup { "work with one immediately failed and one nonempty publisher" in { val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) - subscriber1.expectErrorOrSubscriptionFollowedByError(TestException) + subscriber1.expectSubscriptionAndError(TestException) val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) - subscriber2.expectErrorOrSubscriptionFollowedByError(TestException) + subscriber2.expectSubscriptionAndError(TestException) } "work with one delayed failed and first nonempty publisher" in { @@ -113,7 +113,7 @@ class GraphConcatSpec extends TwoStreamsSetup { if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(2, TestException).isLeft if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(3, TestException).isLeft if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(4, TestException).isLeft - if (!errorSignalled) subscriber.expectErrorOrSubscriptionFollowedByError(TestException) + if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException) } "work with one delayed failed and second nonempty publisher" in { @@ -126,7 +126,7 @@ class GraphConcatSpec extends TwoStreamsSetup { if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(2, TestException).isLeft if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(3, TestException).isLeft if (!errorSignalled) errorSignalled ||= subscriber.expectNextOrError(4, TestException).isLeft - if (!errorSignalled) subscriber.expectErrorOrSubscriptionFollowedByError(TestException) + if (!errorSignalled) subscriber.expectSubscriptionAndError(TestException) } "correctly handle async errors in secondary upstream" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala index 573e7c3868..d93ed94027 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala @@ -580,7 +580,7 @@ class GraphFlexiMergeSpec extends AkkaSpec { val s = SubscriberProbe[String] p.subscribe(s) - s.expectErrorOrSubscriptionFollowedByError().getMessage should be("ERROR") + s.expectSubscriptionAndError().getMessage should be("ERROR") } "emit failure" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala index 284332174a..27acd20a19 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala @@ -52,34 +52,34 @@ class GraphZipSpec extends TwoStreamsSetup { "work with one immediately completed and one nonempty publisher" in { val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) - subscriber1.expectCompletedOrSubscriptionFollowedByComplete() + subscriber1.expectSubscriptionAndComplete() val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) - subscriber2.expectCompletedOrSubscriptionFollowedByComplete() + subscriber2.expectSubscriptionAndComplete() } "work with one delayed completed and one nonempty publisher" in { val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) - subscriber1.expectCompletedOrSubscriptionFollowedByComplete() + subscriber1.expectSubscriptionAndComplete() val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) - subscriber2.expectCompletedOrSubscriptionFollowedByComplete() + subscriber2.expectSubscriptionAndComplete() } "work with one immediately failed and one nonempty publisher" in { val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) - subscriber1.expectErrorOrSubscriptionFollowedByError(TestException) + subscriber1.expectSubscriptionAndError(TestException) val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) - subscriber2.expectErrorOrSubscriptionFollowedByError(TestException) + subscriber2.expectSubscriptionAndError(TestException) } "work with one delayed failed and one nonempty publisher" in { val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4)) - subscriber1.expectErrorOrSubscriptionFollowedByError(TestException) + subscriber1.expectSubscriptionAndError(TestException) val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher) - val subscription2 = subscriber2.expectErrorOrSubscriptionFollowedByError(TestException) + val subscription2 = subscriber2.expectSubscriptionAndError(TestException) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala index f52f7b5b33..1759030e44 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala @@ -73,34 +73,34 @@ class GraphZipWithSpec extends TwoStreamsSetup { "work with one immediately completed and one nonempty publisher" in { val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) - subscriber1.expectCompletedOrSubscriptionFollowedByComplete() + subscriber1.expectSubscriptionAndComplete() val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) - subscriber2.expectCompletedOrSubscriptionFollowedByComplete() + subscriber2.expectSubscriptionAndComplete() } "work with one delayed completed and one nonempty publisher" in { val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) - subscriber1.expectCompletedOrSubscriptionFollowedByComplete() + subscriber1.expectSubscriptionAndComplete() val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) - subscriber2.expectCompletedOrSubscriptionFollowedByComplete() + subscriber2.expectSubscriptionAndComplete() } "work with one immediately failed and one nonempty publisher" in { val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) - subscriber1.expectErrorOrSubscriptionFollowedByError(TestException) + subscriber1.expectSubscriptionAndError(TestException) val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) - subscriber2.expectErrorOrSubscriptionFollowedByError(TestException) + subscriber2.expectSubscriptionAndError(TestException) } "work with one delayed failed and one nonempty publisher" in { val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4)) - subscriber1.expectErrorOrSubscriptionFollowedByError(TestException) + subscriber1.expectSubscriptionAndError(TestException) val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher) - val subscription2 = subscriber2.expectErrorOrSubscriptionFollowedByError(TestException) + val subscription2 = subscriber2.expectSubscriptionAndError(TestException) } "zipWith a ETA expanded Person.apply (3 inputs)" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index b30d5d0807..6502aede56 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -53,11 +53,11 @@ class SourceSpec extends AkkaSpec { val p = Source.empty.runWith(Sink.publisher()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) - c.expectComplete() + c.expectSubscriptionAndComplete() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c2) - c2.expectComplete() + c2.expectSubscriptionAndComplete() } } @@ -67,11 +67,11 @@ class SourceSpec extends AkkaSpec { val p = Source.failed(ex).runWith(Sink.publisher()) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) - c.expectError(ex) + c.expectSubscriptionAndError(ex) val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c2) - c2.expectError(ex) + c2.expectSubscriptionAndError(ex) } } @@ -166,4 +166,4 @@ class SourceSpec extends AkkaSpec { } } -} \ No newline at end of file +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala index 0bdaaaf0cf..faa2123abe 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala @@ -56,7 +56,7 @@ class TickSourceSpec extends AkkaSpec { p.subscribe(c1) p.subscribe(c2) val sub1 = c1.expectSubscription() - c2.expectError() + c2.expectSubscriptionAndError() sub1.request(1) c1.expectNext("tick") c1.expectNoMsg(200.millis) diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index cfc1ebc4db..56443c002b 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -19,6 +19,8 @@ import akka.actor.UntypedActor import concurrent.duration.Duration import concurrent.duration.FiniteDuration import akka.actor.DeadLetterSuppression +import akka.stream.impl.CancelledSubscription +import akka.stream.impl.ReactiveStreamsCompliance._ object ActorPublisher { @@ -240,10 +242,9 @@ trait ActorPublisher[T] extends Actor { super.aroundReceive(receive, msg) } else { demand += n - if (demand < 0 && lifecycleState == Active) // Long has overflown - onError(totalPendingDemandMustNotExceedLongMaxValueException) - else - super.aroundReceive(receive, msg) + if (demand < 0) + demand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded + super.aroundReceive(receive, msg) } case Subscribe(sub: Subscriber[_]) ⇒ @@ -253,11 +254,16 @@ trait ActorPublisher[T] extends Actor { subscriber = sub lifecycleState = Active tryOnSubscribe(sub, new ActorPublisherSubscription(self)) - case ErrorEmitted(cause) ⇒ tryOnError(sub, cause) - case Completed ⇒ tryOnComplete(sub) + case ErrorEmitted(cause) ⇒ + tryOnSubscribe(sub, CancelledSubscription) + tryOnError(sub, cause) + case Completed ⇒ + tryOnSubscribe(sub, CancelledSubscription) + tryOnComplete(sub) case Active | Canceled ⇒ + tryOnSubscribe(sub, CancelledSubscription) tryOnError(sub, - if (subscriber eq sub) ReactiveStreamsCompliance.canNotSubscribeTheSameSubscriberMultipleTimesException + if (subscriber == sub) ReactiveStreamsCompliance.canNotSubscribeTheSameSubscriberMultipleTimesException else ReactiveStreamsCompliance.canNotSubscribeTheSameSubscriberMultipleTimesException) } @@ -337,8 +343,10 @@ private[akka] final case class ActorPublisherImpl[T](ref: ActorRef) extends Publ import ActorPublisher._ import ActorPublisher.Internal._ - override def subscribe(sub: Subscriber[_ >: T]): Unit = + override def subscribe(sub: Subscriber[_ >: T]): Unit = { + requireNonNullSubscriber(sub) ref ! Subscribe(sub.asInstanceOf[Subscriber[Any]]) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala index 2094ef226f..e71652b7e6 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala @@ -15,6 +15,7 @@ import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider import akka.actor.UntypedActor import akka.actor.DeadLetterSuppression +import akka.stream.impl.ReactiveStreamsCompliance object ActorSubscriber { @@ -274,10 +275,19 @@ trait ActorSubscriber extends Actor { */ private[akka] final class ActorSubscriberImpl[T](val impl: ActorRef) extends Subscriber[T] { import ActorSubscriberMessage._ - override def onError(cause: Throwable): Unit = impl ! OnError(cause) + override def onError(cause: Throwable): Unit = { + ReactiveStreamsCompliance.requireNonNullException(cause) + impl ! OnError(cause) + } override def onComplete(): Unit = impl ! OnComplete - override def onNext(element: T): Unit = impl ! OnNext(element) - override def onSubscribe(subscription: Subscription): Unit = impl ! ActorSubscriber.OnSubscribe(subscription) + override def onNext(element: T): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(element) + impl ! OnNext(element) + } + override def onSubscribe(subscription: Subscription): Unit = { + ReactiveStreamsCompliance.requireNonNullSubscription(subscription) + impl ! ActorSubscriber.OnSubscribe(subscription) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 490fcdf69f..16e8b838f8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -28,10 +28,19 @@ private[akka] object ActorProcessor { */ private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[O](impl) with Processor[I, O] { - override def onSubscribe(s: Subscription): Unit = impl ! OnSubscribe(s) - override def onError(t: Throwable): Unit = impl ! OnError(t) + override def onSubscribe(s: Subscription): Unit = { + ReactiveStreamsCompliance.requireNonNullSubscription(s) + impl ! OnSubscribe(s) + } + override def onError(t: Throwable): Unit = { + ReactiveStreamsCompliance.requireNonNullException(t) + impl ! OnError(t) + } override def onComplete(): Unit = impl ! OnComplete - override def onNext(t: I): Unit = impl ! OnNext(t) + override def onNext(elem: I): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(elem) + impl ! OnNext(elem) + } } /** @@ -162,11 +171,12 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D private val _subreceive = new SubReceive(waitingExposedPublisher) def enqueueOutputElement(elem: Any): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(elem) downstreamDemand -= 1 tryOnNext(subscriber, elem) } - def complete(): Unit = { + override def complete(): Unit = { if (!downstreamCompleted) { downstreamCompleted = true if (exposedPublisher ne null) exposedPublisher.shutdown(None) @@ -174,7 +184,14 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D } } - def cancel(e: Throwable): Unit = { + override def cancel(): Unit = { + if (!downstreamCompleted) { + downstreamCompleted = true + if (exposedPublisher ne null) exposedPublisher.shutdown(None) + } + } + + override def error(e: Throwable): Unit = { if (!downstreamCompleted) { downstreamCompleted = true if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) @@ -182,7 +199,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D } } - def isClosed: Boolean = downstreamCompleted + override def isClosed: Boolean = downstreamCompleted protected def createSubscription(): Subscription = new ActorSubscription(actor, subscriber) @@ -192,7 +209,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D subscriber = sub tryOnSubscribe(subscriber, createSubscription()) } else - tryOnError(sub, new IllegalStateException(s"${Logging.simpleName(this)} ${SupportsOnlyASingleSubscriber}")) + rejectAdditionalSubscriber(sub, s"${Logging.simpleName(this)}") } protected def waitingExposedPublisher: Actor.Receive = { @@ -208,14 +225,12 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D subscribePending(exposedPublisher.takePendingSubscribers()) case RequestMore(subscription, elements) ⇒ if (elements < 1) { - cancel(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException) + error(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException) } else { downstreamDemand += elements - if (downstreamDemand < 1) { // Long has overflown - cancel(ReactiveStreamsCompliance.totalPendingDemandMustNotExceedLongMaxValueException) - } - - pump.pump() // FIXME should this be called even on overflow, sounds like a bug to me + if (downstreamDemand < 1) + downstreamDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded + pump.pump() } case Cancel(subscription) ⇒ downstreamCompleted = true @@ -255,11 +270,10 @@ private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMateriali protected def onError(e: Throwable): Unit = fail(e) protected def fail(e: Throwable): Unit = { - // FIXME: escalate to supervisor if (settings.debugLogging) log.debug("fail due to: {}", e.getMessage) primaryInputs.cancel() - primaryOutputs.cancel(e) + primaryOutputs.error(e) context.stop(self) } @@ -273,7 +287,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMateriali override def postStop(): Unit = { primaryInputs.cancel() - primaryOutputs.cancel(new IllegalStateException("Processor actor terminated abruptly")) + primaryOutputs.error(new IllegalStateException("Processor actor terminated abruptly")) } override def postRestart(reason: Throwable): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index f776ce4943..a97aa4458e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -47,7 +47,8 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { protected val wakeUpMsg: Any = SubscribePending override def subscribe(subscriber: Subscriber[_ >: T]): Unit = { - @tailrec def doSubscribe(subscriber: Subscriber[_ >: T]): Unit = { + requireNonNullSubscriber(subscriber) + @tailrec def doSubscribe(): Unit = { val current = pendingSubscribers.get if (current eq null) reportSubscribeFailure(subscriber) @@ -55,11 +56,11 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { if (pendingSubscribers.compareAndSet(current, subscriber +: current)) impl ! wakeUpMsg else - doSubscribe(subscriber) // CAS retry + doSubscribe() // CAS retry } } - doSubscribe(subscriber) + doSubscribe() } def takePendingSubscribers(): immutable.Seq[Subscriber[_ >: T]] = { @@ -82,11 +83,11 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { try shutdownReason match { case Some(e: SpecViolation) ⇒ // ok, not allowed to call onError case Some(e) ⇒ - if (shutdownReason eq ActorPublisher.NormalShutdownReason) - (new RuntimeException("BOOM")).printStackTrace() - + tryOnSubscribe(subscriber, CancelledSubscription) tryOnError(subscriber, e) - case None ⇒ tryOnComplete(subscriber) + case None ⇒ + tryOnSubscribe(subscriber, CancelledSubscription) + tryOnComplete(subscriber) } catch { case _: SpecViolation ⇒ // nothing to do } diff --git a/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala index a9de046ea6..8a2355c518 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala @@ -19,15 +19,20 @@ private[akka] class BlackholeSubscriber[T](highWatermark: Int) extends Subscribe private val subscription: AtomicReference[Subscription] = new AtomicReference(null) override def onSubscribe(sub: Subscription): Unit = { + ReactiveStreamsCompliance.requireNonNullSubscription(sub) if (subscription.compareAndSet(null, sub)) requestMore() else sub.cancel() } - override def onError(cause: Throwable): Unit = () + override def onError(cause: Throwable): Unit = { + ReactiveStreamsCompliance.requireNonNullException(cause) + () + } override def onComplete(): Unit = () override def onNext(element: T): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(element) requested -= 1 requestMore() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 6c25283368..8f5fec2b55 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -4,6 +4,7 @@ package akka.stream.impl import org.reactivestreams.{ Subscriber, Publisher } +import org.reactivestreams.Subscription /** * INTERNAL API @@ -11,7 +12,11 @@ import org.reactivestreams.{ Subscriber, Publisher } private[akka] case object EmptyPublisher extends Publisher[Nothing] { import ReactiveStreamsCompliance._ override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = - try tryOnComplete(subscriber) catch { + try { + requireNonNullSubscriber(subscriber) + tryOnSubscribe(subscriber, CancelledSubscription) + tryOnComplete(subscriber) + } catch { case _: SpecViolation ⇒ // nothing we can do } def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] @@ -24,9 +29,33 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] { private[akka] final case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] { import ReactiveStreamsCompliance._ override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = - try tryOnError(subscriber, t) catch { + try { + requireNonNullSubscriber(subscriber) + tryOnSubscribe(subscriber, CancelledSubscription) + tryOnError(subscriber, t) + } catch { case _: SpecViolation ⇒ // nothing we can do } def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] override def toString: String = name } + +/** + * INTERNAL API + * This is only a legal subscription when it is immediately followed by + * a termination signal (onComplete, onError). + */ +private[akka] case object CancelledSubscription extends Subscription { + override def request(elements: Long): Unit = () + override def cancel(): Unit = () +} + +/** + * INTERNAL API + */ +private[akka] case object NullSubscriber extends Subscriber[Any] { + def onComplete(): Unit = () + def onError(cause: Throwable): Unit = () + def onNext(elem: Any): Unit = () + def onSubscribe(s: Subscription): Unit = () +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index c1b7586cbb..2bfab90e87 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -25,10 +25,19 @@ private[akka] object FanIn { final case class OnSubscribe(id: Int, subscription: Subscription) extends DeadLetterSuppression private[akka] final case class SubInput[T](impl: ActorRef, id: Int) extends Subscriber[T] { - override def onError(cause: Throwable): Unit = impl ! OnError(id, cause) + override def onError(cause: Throwable): Unit = { + ReactiveStreamsCompliance.requireNonNullException(cause) + impl ! OnError(id, cause) + } override def onComplete(): Unit = impl ! OnComplete(id) - override def onNext(element: T): Unit = impl ! OnNext(id, element) - override def onSubscribe(subscription: Subscription): Unit = impl ! OnSubscribe(id, subscription) + override def onNext(element: T): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(element) + impl ! OnNext(id, element) + } + override def onSubscribe(subscription: Subscription): Unit = { + ReactiveStreamsCompliance.requireNonNullSubscription(subscription) + impl ! OnSubscribe(id, subscription) + } } abstract class InputBunch(inputCount: Int, bufferSize: Int, pump: Pump) { @@ -226,17 +235,16 @@ private[akka] abstract class FanIn(val settings: ActorFlowMaterializerSettings, override def pumpFailed(e: Throwable): Unit = fail(e) protected def fail(e: Throwable): Unit = { - // FIXME: escalate to supervisor if (settings.debugLogging) log.debug("fail due to: {}", e.getMessage) inputBunch.cancel() - primaryOutputs.cancel(e) + primaryOutputs.error(e) context.stop(self) } override def postStop(): Unit = { inputBunch.cancel() - primaryOutputs.cancel(new IllegalStateException("Processor actor terminated abruptly")) + primaryOutputs.error(new IllegalStateException("Processor actor terminated abruptly")) } override def postRestart(reason: Throwable): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index a3d52e8604..963ae08464 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -25,9 +25,7 @@ private[akka] object FanOut { final case class SubstreamSubscribePending(id: Int) extends DeadLetterSuppression class SubstreamSubscription(val parent: ActorRef, val id: Int) extends Subscription { - override def request(elements: Long): Unit = - if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") - else parent ! SubstreamRequestMore(id, elements) + override def request(elements: Long): Unit = parent ! SubstreamRequestMore(id, elements) override def cancel(): Unit = parent ! SubstreamCancel(id) override def toString = "SubstreamSubscription" + System.identityHashCode(this) } @@ -100,7 +98,7 @@ private[akka] object FanOut { def error(output: Int, e: Throwable): Unit = if (!errored(output) && !cancelled(output) && !completed(output)) { - outputs(output).cancel(e) + outputs(output).error(e) errored(output) = true unmarkOutput(output) } @@ -217,9 +215,13 @@ private[akka] object FanOut { } case SubstreamRequestMore(id, demand) ⇒ - if (marked(id) && !pending(id)) markedPending += 1 - pending(id) = true - outputs(id).subreceive(RequestMore(null, demand)) + if (demand < 1) // According to Reactive Streams Spec 3.9, with non-positive demand must yield onError + error(id, ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException) + else { + if (marked(id) && !pending(id)) markedPending += 1 + pending(id) = true + outputs(id).subreceive(RequestMore(null, demand)) + } case SubstreamCancel(id) ⇒ if (unmarkCancelled) { unmarkOutput(id) @@ -256,7 +258,6 @@ private[akka] abstract class FanOut(val settings: ActorFlowMaterializerSettings, override def pumpFailed(e: Throwable): Unit = fail(e) protected def fail(e: Throwable): Unit = { - // FIXME: escalate to supervisor if (settings.debugLogging) log.debug("fail due to: {}", e.getMessage) primaryInputs.cancel() diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index b878fcac03..8aadb00946 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -25,17 +25,20 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu override val subreceive = new SubReceive(waitingExposedPublisher) def enqueueOutputElement(elem: Any): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(elem) downstreamBufferSpace -= 1 pushToDownstream(elem) } - def complete(): Unit = + override def complete(): Unit = if (!downstreamCompleted) { downstreamCompleted = true completeDownstream() } - def cancel(e: Throwable): Unit = { + override def cancel(): Unit = complete() + + override def error(e: Throwable): Unit = { if (!downstreamCompleted) { downstreamCompleted = true abortDownstream(e) @@ -105,11 +108,10 @@ private[akka] class FanoutProcessorImpl( } override def fail(e: Throwable): Unit = { - // FIXME: escalate to supervisor if (settings.debugLogging) log.debug("fail due to: {}", e.getMessage) primaryInputs.cancel() - primaryOutputs.cancel(e) + primaryOutputs.error(e) // Stopping will happen after flush } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala index 633b0b0a5a..b27f5ae94a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala @@ -121,7 +121,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate } def registerSubscriber(subscriber: Subscriber[Any]): Unit = { - if (subscribers.contains(subscriber)) // FIXME this is not legal AFAICT, needs to check identity, not equality + if (subscribers.contains(subscriber)) rejectDuplicateSubscriber(subscriber) else { val subscription = new FutureSubscription(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala index 3daef5fd80..8af1fbeb82 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala @@ -141,7 +141,10 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin val future = f(elem) submittedSeqNo += 1 val seqNo = submittedSeqNo - future.map(FutureElement(seqNo, _)).recover { + future.map { elem ⇒ + ReactiveStreamsCompliance.requireNonNullElement(elem) + FutureElement(seqNo, elem) + }.recover { case err: Throwable if decider(err) != Supervision.Stop ⇒ FutureElement(seqNo, RecoveredError(elem, err)) case err ⇒ FutureFailure(err) diff --git a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala index 04671670b0..990a4b83b1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala @@ -80,7 +80,10 @@ private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMateriali try { val future = f(elem) inProgressCount += 1 - future.map(FutureElement.apply).recover { + future.map { elem ⇒ + ReactiveStreamsCompliance.requireNonNullElement(elem) + FutureElement(elem) + }.recover { case err ⇒ FutureFailure(elem, err) }.pipeTo(self) } catch { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala index e33936d22f..d6c24aa87a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala @@ -17,11 +17,13 @@ private[stream] object ReactiveStreamsCompliance { final val NumberOfElementsInRequestMustBePositiveMsg = "The number of requested elements must be > 0 (see reactive-streams specification, rule 3.9)" - final val TotalPendingDemandMustNotExceedLongMaxValue = - "Total pending demand MUST NOT be > `java.lang.Long.MAX_VALUE` (see reactive-streams specification, rule 3.17)" + final val SubscriberMustNotBeNullMsg = "Subscriber must not be null, rule 1.9" - final def totalPendingDemandMustNotExceedLongMaxValueException: Throwable = - new IllegalStateException(TotalPendingDemandMustNotExceedLongMaxValue) + final val ExceptionMustNotBeNullMsg = "Exception must not be null, rule 2.13" + + final val ElementMustNotBeNullMsg = "Element must not be null, rule 2.13" + + final val SubscriptionMustNotBeNullMsg = "Subscription must not be null, rule 2.13" final def numberOfElementsInRequestMustBePositiveException: Throwable = new IllegalArgumentException(NumberOfElementsInRequestMustBePositiveMsg) @@ -29,18 +31,44 @@ private[stream] object ReactiveStreamsCompliance { final def canNotSubscribeTheSameSubscriberMultipleTimesException: Throwable = new IllegalStateException(CanNotSubscribeTheSameSubscriberMultipleTimes) - final def rejectDuplicateSubscriber[T](subscriber: Subscriber[T]): Unit = + final def subscriberMustNotBeNullException: Throwable = + new NullPointerException(SubscriberMustNotBeNullMsg) + + final def exceptionMustNotBeNullException: Throwable = + new NullPointerException(ExceptionMustNotBeNullMsg) + + final def elementMustNotBeNullException: Throwable = + new NullPointerException(ElementMustNotBeNullMsg) + + final def subscriptionMustNotBeNullException: Throwable = + new NullPointerException(SubscriptionMustNotBeNullMsg) + + final def rejectDuplicateSubscriber[T](subscriber: Subscriber[T]): Unit = { + // since it is already subscribed it has received the subscription first + // and we can emit onError immediately tryOnError(subscriber, canNotSubscribeTheSameSubscriberMultipleTimesException) + } - final def rejectAdditionalSubscriber[T](subscriber: Subscriber[T], rejector: Publisher[T]): Unit = + final def rejectAdditionalSubscriber[T](subscriber: Subscriber[T], rejector: String): Unit = { + tryOnSubscribe(subscriber, CancelledSubscription) tryOnError(subscriber, new IllegalStateException(s"$rejector $SupportsOnlyASingleSubscriber")) - - final def rejectDueToOverflow[T](subscriber: Subscriber[T]): Unit = - tryOnError(subscriber, totalPendingDemandMustNotExceedLongMaxValueException) + } final def rejectDueToNonPositiveDemand[T](subscriber: Subscriber[T]): Unit = tryOnError(subscriber, numberOfElementsInRequestMustBePositiveException) + final def requireNonNullSubscriber[T](subscriber: Subscriber[T]): Unit = + if (subscriber eq null) throw subscriberMustNotBeNullException + + final def requireNonNullException(cause: Throwable): Unit = + if (cause eq null) throw exceptionMustNotBeNullException + + final def requireNonNullElement[T](element: T): Unit = + if (element == null) throw elementMustNotBeNullException + + final def requireNonNullSubscription(subscription: Subscription): Unit = + if (subscription == null) throw subscriptionMustNotBeNullException + @SerialVersionUID(1L) sealed trait SpecViolation extends Throwable @@ -56,15 +84,18 @@ private[stream] object ReactiveStreamsCompliance { } } - final def tryOnNext[T](subscriber: Subscriber[T], element: T): Unit = + final def tryOnNext[T](subscriber: Subscriber[T], element: T): Unit = { + requireNonNullElement(element) try subscriber.onNext(element) catch { case NonFatal(t) ⇒ throw new SignalThrewException(subscriber + ".onNext", t) } + } - final def tryOnSubscribe[T](subscriber: Subscriber[T], subscription: Subscription): Unit = + final def tryOnSubscribe[T](subscriber: Subscriber[T], subscription: Subscription): Unit = { try subscriber.onSubscribe(subscription) catch { case NonFatal(t) ⇒ throw new SignalThrewException(subscriber + ".onSubscribe", t) } + } final def tryOnComplete[T](subscriber: Subscriber[T]): Unit = try subscriber.onComplete() catch { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 6ba4cbff81..4e7eb40b7b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -82,12 +82,23 @@ object HeadSink { /** INTERNAL API */ private[akka] class HeadSinkSubscriber[In](p: Promise[In]) extends Subscriber[In] { private val sub = new AtomicReference[Subscription] - override def onSubscribe(s: Subscription): Unit = + override def onSubscribe(s: Subscription): Unit = { + ReactiveStreamsCompliance.requireNonNullSubscription(s) if (!sub.compareAndSet(null, s)) s.cancel() else s.request(1) + } + + override def onNext(elem: In): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(elem) + p.trySuccess(elem) + sub.get.cancel() + } + + override def onError(t: Throwable): Unit = { + ReactiveStreamsCompliance.requireNonNullException(t) + p.tryFailure(t) + } - override def onNext(t: In): Unit = { p.trySuccess(t); sub.get.cancel() } - override def onError(t: Throwable): Unit = p.tryFailure(t) override def onComplete(): Unit = p.tryFailure(new NoSuchElementException("empty stream")) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index 5b75df29be..440c8e6ec7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -83,7 +83,7 @@ final class PublisherSource[Out](p: Publisher[Out], val attributes: OperationAtt * may happen before or after materializing the `Flow`. * The stream terminates with an error if the `Future` is completed with a failure. */ -final class FutureSource[Out](future: Future[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) { // FIXME Why does this have anything to do with Actors? +final class FutureSource[Out](future: Future[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) { override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = future.value match { case Some(Success(element)) ⇒ @@ -105,15 +105,12 @@ final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: Sou override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { val p = Promise[Unit]() - // Not TCK verified as RC1 does not allow "empty publishers", - // reactive-streams on master now contains support for empty publishers. - // so we can enable it then, though it will require external completing of the promise val pub = new Publisher[Unit] { override def subscribe(s: Subscriber[_ >: Unit]) = { + requireNonNullSubscriber(s) tryOnSubscribe(s, new Subscription { override def request(n: Long): Unit = () - - override def cancel(): Unit = p.success(()) + override def cancel(): Unit = p.trySuccess(()) }) p.future.onComplete { case Success(_) ⇒ tryOnComplete(s) @@ -136,7 +133,7 @@ final class LazyEmptySource[Out](val attributes: OperationAttributes, shape: Sou * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ -final class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Out, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Cancellable](shape) { // FIXME Why does this have anything to do with Actors? +final class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Out, val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Cancellable](shape) { override def create(materializer: ActorFlowMaterializerImpl, flowName: String) = { val cancelled = new AtomicBoolean(false) @@ -169,4 +166,4 @@ final class PropsSource[Out](props: Props, val attributes: OperationAttributes, override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = new PropsSource[Out](props, attributes, shape) override def withAttributes(attr: OperationAttributes): Module = new PropsSource(props, attr, shape) -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala index 9be72dc1e7..6057147e60 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala @@ -89,4 +89,9 @@ private[akka] class SplitWhenProcessorImpl(_settings: ActorFlowMaterializerSetti super.completeSubstreamOutput(substream) } + override def cancelSubstreamOutput(substream: SubstreamKey): Unit = { + if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream) + super.cancelSubstreamOutput(substream) + } + } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 6602e67e6d..de466eabe7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -114,13 +114,11 @@ private[stream] object Stages { override protected def newInstance: StageModule = this.copy() } - // FIXME Replace with OperateAsync final case class MapAsync(f: Any ⇒ Future[Any], attributes: OperationAttributes = mapAsync) extends StageModule { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() } - //FIXME Should be OperateUnorderedAsync final case class MapAsyncUnordered(f: Any ⇒ Future[Any], attributes: OperationAttributes = mapAsyncUnordered) extends StageModule { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() @@ -133,14 +131,12 @@ private[stream] object Stages { override protected def newInstance: StageModule = this.copy() } - //FIXME should be `n: Long` - final case class Take(n: Int, attributes: OperationAttributes = take) extends StageModule { + final case class Take(n: Long, attributes: OperationAttributes = take) extends StageModule { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() } - //FIXME should be `n: Long` - final case class Drop(n: Int, attributes: OperationAttributes = drop) extends StageModule { + final case class Drop(n: Long, attributes: OperationAttributes = drop) extends StageModule { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 354014905d..d780b7a738 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -35,6 +35,7 @@ private[akka] object MultiStreamOutputProcessor { case object Open extends PublisherState final case class Attached(sub: Subscriber[Any]) extends PublisherState case object Completed extends CompletedState + case object Cancelled extends CompletedState final case class Failed(e: Throwable) extends CompletedState } @@ -57,13 +58,21 @@ private[akka] object MultiStreamOutputProcessor { pump.pump() } - override def cancel(e: Throwable): Unit = { + override def error(e: Throwable): Unit = { if (!downstreamCompleted) { closePublisher(Failed(e)) downstreamCompleted = true } } + override def cancel(): Unit = { + if (!downstreamCompleted) { + closePublisher(Cancelled) + subscriber = NullSubscriber // FIXME unreference real subscriber, should not be needed after #16986 + downstreamCompleted = true + } + } + override def complete(): Unit = { if (!downstreamCompleted) { closePublisher(Completed) @@ -82,18 +91,21 @@ private[akka] object MultiStreamOutputProcessor { private def closeSubscriber(s: Subscriber[Any], withState: CompletedState): Unit = withState match { case Completed ⇒ tryOnComplete(s) + case Cancelled ⇒ // nothing to do case Failed(e: SpecViolation) ⇒ // nothing to do case Failed(e) ⇒ tryOnError(s, e) } override def subscribe(s: Subscriber[_ >: Any]): Unit = { + requireNonNullSubscriber(s) subscriptionTimeout.cancel() if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s) else { state.get() match { - case _: Attached ⇒ - tryOnError(s, new IllegalStateException("Substream publisher " + SupportsOnlyASingleSubscriber)) + case _: Attached | Cancelled ⇒ + rejectAdditionalSubscriber(s, "Substream publisher") case c: CompletedState ⇒ + tryOnSubscribe(s, CancelledSubscription) closeSubscriber(s, c) case Open ⇒ throw new IllegalStateException("Publisher cannot become open after being used before") @@ -106,7 +118,7 @@ private[akka] object MultiStreamOutputProcessor { subscriber = s tryOnSubscribe(subscriber, subscription) } else - tryOnError(subscriber, new IllegalStateException("Substream publisher " + SupportsOnlyASingleSubscriber)) + rejectAdditionalSubscriber(s, "Substream publisher") } } @@ -133,10 +145,19 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc } protected def invalidateSubstreamOutput(substream: SubstreamKey): Unit = { - completeSubstreamOutput(substream) + cancelSubstreamOutput(substream) pump() } + protected def cancelSubstreamOutput(substream: SubstreamKey): Unit = { + substreamOutputs.get(substream) match { + case Some(sub) ⇒ + sub.cancel() + substreamOutputs -= substream + case _ ⇒ // ignore, already completed... + } + } + protected def completeSubstreamOutput(substream: SubstreamKey): Unit = { substreamOutputs.get(substream) match { case Some(sub) ⇒ @@ -147,7 +168,7 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc } protected def failOutputs(e: Throwable): Unit = { - substreamOutputs.values foreach (_.cancel(e)) + substreamOutputs.values foreach (_.error(e)) } protected def finishOutputs(): Unit = { @@ -155,10 +176,15 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc } val outputSubstreamManagement: Receive = { - case SubstreamRequestMore(key, demand) ⇒ substreamOutputs.get(key) match { - case Some(sub) ⇒ sub.enqueueOutputDemand(demand) - case _ ⇒ // ignore... - } + case SubstreamRequestMore(key, demand) ⇒ + substreamOutputs.get(key) match { + case Some(sub) ⇒ + if (demand < 1) // According to Reactive Streams Spec 3.9, with non-positive demand must yield onError + sub.error(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException) + else + sub.enqueueOutputDemand(demand) + case _ ⇒ // ignore... + } case SubstreamSubscribe(key, subscriber) ⇒ substreamOutputs.get(key) match { case Some(sub) ⇒ sub.attachSubscriber(subscriber) case _ ⇒ // ignore... @@ -167,12 +193,13 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc case Some(sub) if !sub.isAttached() ⇒ subscriptionTimedOut(sub) case _ ⇒ // ignore... } - case SubstreamCancel(key) ⇒ invalidateSubstreamOutput(key) + case SubstreamCancel(key) ⇒ + invalidateSubstreamOutput(key) } override protected def handleSubscriptionTimeout(target: Publisher[_], cause: Exception) = target match { case s: SubstreamOutput ⇒ - s.cancel(cause) + s.error(cause) s.attachSubscriber(CancelingSubscriber) case _ ⇒ // ignore } @@ -205,10 +232,19 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: ActorFlowMate */ private[akka] object TwoStreamInputProcessor { class OtherActorSubscriber[T](val impl: ActorRef) extends Subscriber[T] { - override def onError(cause: Throwable): Unit = impl ! OtherStreamOnError(cause) + override def onError(cause: Throwable): Unit = { + ReactiveStreamsCompliance.requireNonNullException(cause) + impl ! OtherStreamOnError(cause) + } override def onComplete(): Unit = impl ! OtherStreamOnComplete - override def onNext(element: T): Unit = impl ! OtherStreamOnNext(element) - override def onSubscribe(subscription: Subscription): Unit = impl ! OtherStreamOnSubscribe(subscription) + override def onNext(element: T): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(element) + impl ! OtherStreamOnNext(element) + } + override def onSubscribe(subscription: Subscription): Unit = { + ReactiveStreamsCompliance.requireNonNullSubscription(subscription) + impl ! OtherStreamOnSubscribe(subscription) + } } case object OtherStreamOnComplete extends DeadLetterSuppression @@ -264,10 +300,19 @@ private[akka] object MultiStreamInputProcessor { case class SubstreamKey(id: Long) class SubstreamSubscriber[T](val impl: ActorRef, key: SubstreamKey) extends Subscriber[T] { - override def onError(cause: Throwable): Unit = impl ! SubstreamOnError(key, cause) + override def onError(cause: Throwable): Unit = { + ReactiveStreamsCompliance.requireNonNullException(cause) + impl ! SubstreamOnError(key, cause) + } override def onComplete(): Unit = impl ! SubstreamOnComplete(key) - override def onNext(element: T): Unit = impl ! SubstreamOnNext(key, element) - override def onSubscribe(subscription: Subscription): Unit = impl ! SubstreamStreamOnSubscribe(key, subscription) + override def onNext(element: T): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(element) + impl ! SubstreamOnNext(key, element) + } + override def onSubscribe(subscription: Subscription): Unit = { + ReactiveStreamsCompliance.requireNonNullSubscription(subscription) + impl ! SubstreamStreamOnSubscribe(key, subscription) + } } case class SubstreamOnComplete(key: SubstreamKey) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala index ce714b3605..de9a21a1cc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala @@ -16,10 +16,19 @@ object StreamSubscriptionTimeoutSupport { * A subscriber who calls `cancel` directly from `onSubscribe` and ignores all other callbacks. */ final case object CancelingSubscriber extends Subscriber[Any] { - override def onSubscribe(s: Subscription): Unit = s.cancel() - override def onError(t: Throwable): Unit = () + override def onSubscribe(s: Subscription): Unit = { + ReactiveStreamsCompliance.requireNonNullSubscription(s) + s.cancel() + } + override def onError(t: Throwable): Unit = { + ReactiveStreamsCompliance.requireNonNullException(t) + () + } override def onComplete(): Unit = () - override def onNext(t: Any): Unit = () + override def onNext(elem: Any): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(elem) + () + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala index 56bede5603..844746d72b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala @@ -113,40 +113,36 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff } else { endOfStream match { case eos @ (NotReached | Completed) ⇒ - val demand = subscription.totalDemand + elements - //Check for overflow - if (demand < 1) { - try tryOnError(subscription.subscriber, totalPendingDemandMustNotExceedLongMaxValueException) - finally unregisterSubscriptionInternal(subscription) - } else { - subscription.totalDemand = demand - // returns Long.MinValue if the subscription is to be terminated - @tailrec def dispatchFromBufferAndReturnRemainingRequested(requested: Long, eos: EndOfStream): Long = - if (requested == 0) { - // if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore` - if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0 - } else if (buffer.count(subscription) > 0) { - val goOn = try { - subscription.dispatch(buffer.read(subscription)) - true - } catch { - case _: SpecViolation ⇒ - unregisterSubscriptionInternal(subscription) - false - } - if (goOn) dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos) - else Long.MinValue - } else if (eos ne NotReached) Long.MinValue - else requested + val d = subscription.totalDemand + elements + // Long overflow, Reactive Streams Spec 3:17: effectively unbounded + val demand = if (d < 1) Long.MaxValue else d + subscription.totalDemand = demand + // returns Long.MinValue if the subscription is to be terminated + @tailrec def dispatchFromBufferAndReturnRemainingRequested(requested: Long, eos: EndOfStream): Long = + if (requested == 0) { + // if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore` + if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0 + } else if (buffer.count(subscription) > 0) { + val goOn = try { + subscription.dispatch(buffer.read(subscription)) + true + } catch { + case _: SpecViolation ⇒ + unregisterSubscriptionInternal(subscription) + false + } + if (goOn) dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos) + else Long.MinValue + } else if (eos ne NotReached) Long.MinValue + else requested - dispatchFromBufferAndReturnRemainingRequested(demand, eos) match { - case Long.MinValue ⇒ - eos(subscription.subscriber) - unregisterSubscriptionInternal(subscription) - case x ⇒ - subscription.totalDemand = x - requestFromUpstreamIfRequired() - } + dispatchFromBufferAndReturnRemainingRequested(demand, eos) match { + case Long.MinValue ⇒ + eos(subscription.subscriber) + unregisterSubscriptionInternal(subscription) + case x ⇒ + subscription.totalDemand = x + requestFromUpstreamIfRequired() } case ErrorCompleted(_) ⇒ // ignore, the Subscriber might not have seen our error event yet } @@ -227,7 +223,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff * Register a new subscriber. */ protected def registerSubscriber(subscriber: Subscriber[_ >: T]): Unit = endOfStream match { - case NotReached if subscriptions.exists(_.subscriber eq subscriber) ⇒ ReactiveStreamsCompliance.rejectDuplicateSubscriber(subscriber) + case NotReached if subscriptions.exists(_.subscriber == subscriber) ⇒ ReactiveStreamsCompliance.rejectDuplicateSubscriber(subscriber) case NotReached ⇒ addSubscription(subscriber) case Completed if buffer.nonEmpty ⇒ addSubscription(subscriber) case eos ⇒ eos(subscriber) diff --git a/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala index c6931268f2..65ff9feb5b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala @@ -10,6 +10,7 @@ import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.annotation.tailrec import scala.collection.immutable import scala.util.control.NonFatal +import akka.stream.impl.ReactiveStreamsCompliance._ /** * INTERNAL API @@ -56,10 +57,9 @@ private[akka] object SynchronousIterablePublisher { rejectDueToNonPositiveDemand(subscriber) } else { pendingDemand += elements - if (pendingDemand < 1) { // According to Reactive Streams Spec 3:17, if we overflow 2^63-1, we need to yield onError - cancel() - rejectDueToOverflow(subscriber) - } else if (!pushing) { + if (pendingDemand < 1) + pendingDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded + if (!pushing) { // According to Reactive Streams Spec 3:3, we must prevent unbounded recursion try { pushing = true @@ -113,8 +113,10 @@ private[akka] final class SynchronousIterablePublisher[T]( import SynchronousIterablePublisher.IteratorSubscription - override def subscribe(subscriber: Subscriber[_ >: T]): Unit = + override def subscribe(subscriber: Subscriber[_ >: T]): Unit = { + requireNonNullSubscriber(subscriber) IteratorSubscription(subscriber, try iterable.iterator catch { case NonFatal(t) ⇒ Iterator.continually(throw t) }) + } override def toString: String = name } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index 267334c6b6..47cf953ac2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -11,6 +11,7 @@ import scala.collection.mutable import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal import akka.actor.DeadLetterSuppression +import akka.event.Logging /** * INTERNAL API @@ -98,8 +99,8 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite handleFailure(numberOfElementsInRequestMustBePositiveException) } else { demand += elements - if (demand < 0) // Long has overflown, reactive-streams specification rule 3.17 - handleFailure(totalPendingDemandMustNotExceedLongMaxValueException) + if (demand < 0) + demand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded } case Cancel ⇒ @@ -116,7 +117,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite subscriber = s tryOnSubscribe(s, subscription) case _ ⇒ - rejectAdditionalSubscriber(s, exposedPublisher) + rejectAdditionalSubscriber(s, s"${Logging.simpleName(this)}") } override def postStop(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala index be68e3f2c0..23835646d7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala @@ -69,7 +69,8 @@ private[akka] trait Outputs { def demandCount: Long = -1L def complete(): Unit - def cancel(e: Throwable): Unit + def cancel(): Unit + def error(e: Throwable): Unit def isClosed: Boolean def isOpen: Boolean = !isClosed } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index 06447a0c26..35bd75bdf7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -218,7 +218,7 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boole subscriber = sub tryOnSubscribe(subscriber, new ActorSubscription(actor, subscriber)) } else - tryOnError(sub, new IllegalStateException(s"${Logging.simpleName(this)} ${SupportsOnlyASingleSubscriber}")) + rejectAdditionalSubscriber(subscriber, s"${Logging.simpleName(this)}") } protected def waitingExposedPublisher: Actor.Receive = { @@ -238,11 +238,9 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boole fail(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException) } else { downstreamDemand += elements - // Long has overflown - if (downstreamDemand < 0) { - enter().finish() - fail(ReactiveStreamsCompliance.totalPendingDemandMustNotExceedLongMaxValueException) - } else if (upstreamWaiting) { + if (downstreamDemand < 0) + downstreamDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded + if (upstreamWaiting) { upstreamWaiting = false enter().pull() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala index 6c635a7d45..ad9ec8b3b0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -8,6 +8,7 @@ import scala.collection.breakOut import scala.util.control.NonFatal import akka.stream.stage._ import akka.stream.Supervision +import akka.stream.impl.ReactiveStreamsCompliance // TODO: // fix jumpback table with keep-going-on-complete ops (we might jump between otherwise isolated execution regions) @@ -221,6 +222,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: def run(): Unit override def push(elem: Any): DownstreamDirective = { + ReactiveStreamsCompliance.requireNonNullElement(elem) if (currentOp.holding) throw new IllegalStateException("Cannot push while holding, only pushAndPull") currentOp.allowedToPush = false elementInFlight = elem @@ -244,6 +246,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: def isFinishing: Boolean = currentOp.terminationPending override def pushAndFinish(elem: Any): DownstreamDirective = { + ReactiveStreamsCompliance.requireNonNullElement(elem) pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] // This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution // path. Other forks are not order dependent because they execute on isolated execution domains which cannot @@ -271,6 +274,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: override def isHolding: Boolean = currentOp.holding override def pushAndPull(elem: Any): FreeDirective = { + ReactiveStreamsCompliance.requireNonNullElement(elem) if (!currentOp.holding) throw new IllegalStateException("Cannot pushAndPull without holding first") currentOp.holding = false fork(Pushing, elem) @@ -301,6 +305,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this) override def pushAndFinish(elem: Any): DownstreamDirective = { + ReactiveStreamsCompliance.requireNonNullElement(elem) elementInFlight = elem state = PushFinish null @@ -493,6 +498,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: override def advance(): Unit = () override def push(elem: Any): DownstreamDirective = { + ReactiveStreamsCompliance.requireNonNullElement(elem) activeOpIndex = entryPoint super.push(elem) execute() @@ -528,6 +534,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: } override def pushAndPull(elem: Any): FreeDirective = { + ReactiveStreamsCompliance.requireNonNullElement(elem) activeOpIndex = entryPoint super.pushAndPull(elem) execute() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index f3495155d1..75df37f62d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -75,8 +75,8 @@ private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out], /** * INTERNAL API */ -private[akka] final case class Take[T](count: Int) extends PushStage[T, T] { - private var left: Int = count +private[akka] final case class Take[T](count: Long) extends PushStage[T, T] { + private var left: Long = count override def onPush(elem: T, ctx: Context[T]): Directive = { left -= 1 @@ -89,8 +89,8 @@ private[akka] final case class Take[T](count: Int) extends PushStage[T, T] { /** * INTERNAL API */ -private[akka] final case class Drop[T](count: Int) extends PushStage[T, T] { - private var left: Int = count +private[akka] final case class Drop[T](count: Long) extends PushStage[T, T] { + private var left: Long = count override def onPush(elem: T, ctx: Context[T]): Directive = if (left > 0) { left -= 1 diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/DelayedInitProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/DelayedInitProcessor.scala index f5be974f28..672494578b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/DelayedInitProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/DelayedInitProcessor.scala @@ -20,12 +20,16 @@ private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[ @volatile private var impl: Processor[I, O] = _ private val setVarFuture = implFuture.andThen { case Success(p) ⇒ impl = p } - override def onSubscribe(s: Subscription): Unit = setVarFuture.onComplete { - case Success(x) ⇒ tryOnSubscribe(x, s) - case Failure(_) ⇒ s.cancel() + override def onSubscribe(s: Subscription): Unit = { + requireNonNullSubscription(s) + setVarFuture.onComplete { + case Success(x) ⇒ tryOnSubscribe(x, s) + case Failure(_) ⇒ s.cancel() + } } override def onError(t: Throwable): Unit = { + requireNonNullException(t) if (impl eq null) setVarFuture.onSuccess { case p ⇒ p.onError(t) } else impl.onError(t) } @@ -35,10 +39,16 @@ private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[ else impl.onComplete() } - override def onNext(t: I): Unit = impl.onNext(t) + override def onNext(t: I): Unit = { + requireNonNullElement(t) + impl.onNext(t) + } - override def subscribe(s: Subscriber[_ >: O]): Unit = setVarFuture.onComplete { - case Success(x) ⇒ x.subscribe(s) - case Failure(e) ⇒ s.onError(e) + override def subscribe(s: Subscriber[_ >: O]): Unit = { + requireNonNullSubscriber(s) + setVarFuture.onComplete { + case Success(x) ⇒ x.subscribe(s) + case Failure(e) ⇒ s.onError(e) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index c12ad0a1e0..b3971f1235 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -133,10 +133,11 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS } - override def cancel(e: Throwable): Unit = { + override def error(e: Throwable): Unit = { if (!closed && initialized) connection ! Abort closed = true } + override def complete(): Unit = { if (!closed && initialized) { closed = true @@ -144,10 +145,13 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS connection ! Close else connection ! ConfirmedClose - } } + + override def cancel(): Unit = complete() + override def enqueueOutputElement(elem: Any): Unit = { + ReactiveStreamsCompliance.requireNonNullElement(elem) connection ! Write(elem.asInstanceOf[ByteString], WriteAck) pendingDemand = false } @@ -217,9 +221,9 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS if (settings.debugLogging) log.debug("fail due to: {}", e.getMessage) tcpInputs.cancel() - tcpOutputs.cancel(e) + tcpOutputs.error(e) primaryInputs.cancel() - primaryOutputs.cancel(e) + primaryOutputs.error(e) } def tryShutdown(): Unit = diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 2ac49d0760..69d602763a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -90,8 +90,10 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket val ex = BindFailedException localAddressPromise.failure(ex) unbindPromise.success(() ⇒ Future.successful(())) - try tryOnError(flowSubscriber, ex) - finally fail(ex) + try { + tryOnSubscribe(flowSubscriber, CancelledSubscription) + tryOnError(flowSubscriber, ex) + } finally fail(ex) } def running: Receive = { @@ -159,6 +161,6 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket if (settings.debugLogging) log.debug("fail due to: {}", e.getMessage) incomingConnections.cancel() - primaryOutputs.cancel(e) + primaryOutputs.error(e) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index d8f6025b97..722015daa6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -210,7 +210,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. */ - def drop(n: Int): javadsl.Flow[In, Out, Mat] = + def drop(n: Long): javadsl.Flow[In, Out, Mat] = new Flow(delegate.drop(n)) /** @@ -228,7 +228,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * The stream will be completed without producing any elements if `n` is zero * or negative. */ - def take(n: Int): javadsl.Flow[In, Out, Mat] = + def take(n: Long): javadsl.Flow[In, Out, Mat] = new Flow(delegate.take(n)) /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index bb61252f14..1ec33e30ef 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -333,7 +333,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. */ - def drop(n: Int): javadsl.Source[Out, Mat] = + def drop(n: Long): javadsl.Source[Out, Mat] = new Source(delegate.drop(n)) /** @@ -350,7 +350,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * * @param n if `n` is zero or negative the stream will be completed without producing any elements. */ - def take(n: Int): javadsl.Source[Out, Mat] = + def take(n: Long): javadsl.Source[Out, Mat] = new Source(delegate.take(n)) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 9606b6bc5e..7b5d1147e2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -325,7 +325,7 @@ trait FlowOps[+Out, +Mat] { * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. */ - def drop(n: Int): Repr[Out, Mat] = andThen(Drop(n)) + def drop(n: Long): Repr[Out, Mat] = andThen(Drop(n)) /** * Discard the elements received within the given duration at beginning of the stream. @@ -355,7 +355,7 @@ trait FlowOps[+Out, +Mat] { * The stream will be completed without producing any elements if `n` is zero * or negative. */ - def take(n: Int): Repr[Out, Mat] = andThen(Take(n)) + def take(n: Long): Repr[Out, Mat] = andThen(Take(n)) /** * Terminate processing (and cancel the upstream publisher) after the given @@ -556,4 +556,4 @@ private[stream] object FlowOps { def completedTransformer[T]: TransformerLike[T, T] = CompletedTransformer.asInstanceOf[TransformerLike[T, T]] def identityTransformer[T]: TransformerLike[T, T] = IdentityTransformer.asInstanceOf[TransformerLike[T, T]] -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala index 4d3f956aa8..2435eda06a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala @@ -36,17 +36,6 @@ final case class OperationAttributes private (attributes: List[OperationAttribut if ((this eq OperationAttributes.none) || (this eq node.attributes)) node else node.withAttributes(attributes = this and node.attributes) - /** - * Filtering out name attributes is needed for Vertex.newInstance(). - * However there is an ongoing discussion for removing this feature, - * after which this will not be needed anymore. - * - * https://github.com/akka/akka/issues/16392 - */ - private[akka] def withoutName = this.copy( // FIXME should return OperationAttributes.none if empty - attributes = attributes.filterNot { // FIXME should return the same instance if didn't have any Name - case attr: Name ⇒ true - }) } object OperationAttributes { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala index a65cf9297b..864e510754 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala @@ -21,6 +21,7 @@ import akka.io.Inet.SocketOption import akka.io.Tcp import akka.stream._ import akka.stream.impl._ +import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.scaladsl._ import akka.util.ByteString import org.reactivestreams.{ Publisher, Processor, Subscriber, Subscription } @@ -92,6 +93,7 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { val publisher = new Publisher[IncomingConnection] { override def subscribe(s: Subscriber[_ >: IncomingConnection]): Unit = { + requireNonNullSubscriber(s) manager ! StreamTcpManager.Bind( localAddressPromise, unbindPromise,