From 4f362f91df12e34cd02b4075043e1b448da9f242 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 6 Oct 2014 14:04:05 +0200 Subject: [PATCH] =str #16040 Use scaladsl2 in TCK tests * found and fixed one issue: onError was invoked twice for the overflow error * and also noticed that we restart the actors, instead of stopping them in case of internal error, changed to stopping strategy --- .../AkkaIdentityProcessorVerification.scala | 10 ++--- .../tck/AkkaPublisherVerification.scala | 6 ++- .../tck/AkkaSubscriberVerification.scala | 12 +++--- .../akka/stream/tck/FanoutProcessorTest.scala | 41 ------------------- .../akka/stream/tck/FanoutPublisherTest.scala | 21 ++++++++++ .../stream/tck/IterablePublisherTest.scala | 8 ++-- .../stream/tck/IteratorPublisherTest.scala | 8 ++-- .../tck/SimpleCallbackPublisherTest.scala | 5 ++- .../stream/tck/TransformProcessorTest.scala | 9 ++-- .../akka/stream/scaladsl2/FlowSpec.scala | 24 +++++------ .../akka/stream/impl/ActorProcessor.scala | 3 +- .../impl2/ActorBasedFlowMaterializer.scala | 7 +++- 12 files changed, 70 insertions(+), 84 deletions(-) delete mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/FanoutProcessorTest.scala create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/FanoutPublisherTest.scala diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala index 212fb98055..b5aeb8dcc8 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala @@ -3,9 +3,11 @@ */ package akka.stream.tck +import scala.collection.immutable import akka.actor.ActorSystem -import akka.stream._ -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2.FlowMaterializer +import akka.stream.scaladsl2.PublisherDrain +import akka.stream.scaladsl2.Source import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit import akka.testkit.EventFilter @@ -15,8 +17,6 @@ import org.reactivestreams.tck.IdentityProcessorVerification import org.reactivestreams.tck.TestEnvironment import org.scalatest.testng.TestNGSuiteLike -import scala.collection.immutable - abstract class AkkaIdentityProcessorVerification[T](val system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long) extends IdentityProcessorVerification[T](env, publisherShutdownTimeout) with TestNGSuiteLike { @@ -48,7 +48,7 @@ abstract class AkkaIdentityProcessorVerification[T](val system: ActorSystem, env if (elements == Long.MaxValue) 1 to Int.MaxValue else 0 until elements.toInt - Flow(iterable).toPublisher() + Source(iterable).runWith(PublisherDrain()) } /** By default Akka Publishers do not support Fanout! */ diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala index 5b452f0aa1..35575ae11f 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala @@ -3,15 +3,17 @@ */ package akka.stream.tck +import scala.concurrent.duration._ + import akka.actor.ActorSystem -import akka.stream._ +import akka.stream.MaterializerSettings +import akka.stream.scaladsl2.FlowMaterializer import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit import org.reactivestreams.Publisher import org.reactivestreams.tck.{ PublisherVerification, TestEnvironment } import org.scalatest.testng.TestNGSuiteLike import org.testng.annotations.AfterClass -import scala.concurrent.duration._ abstract class AkkaPublisherVerification[T](val system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long) extends PublisherVerification[T](env, publisherShutdownTimeout) diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala index f28e41307f..a12bce8c48 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala @@ -3,10 +3,13 @@ */ package akka.stream.tck +import scala.collection.immutable +import scala.concurrent.duration._ import akka.actor.ActorSystem -import akka.stream.FlowMaterializer import akka.stream.MaterializerSettings -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2.FlowMaterializer +import akka.stream.scaladsl2.PublisherDrain +import akka.stream.scaladsl2.Source import akka.stream.testkit.AkkaSpec import org.reactivestreams.Publisher import org.reactivestreams.tck.SubscriberBlackboxVerification @@ -15,9 +18,6 @@ import org.reactivestreams.tck.TestEnvironment import org.scalatest.testng.TestNGSuiteLike import org.testng.annotations.AfterClass -import scala.collection.immutable -import scala.concurrent.duration._ - abstract class AkkaSubscriberBlackboxVerification[T](val system: ActorSystem, env: TestEnvironment) extends SubscriberBlackboxVerification[T](env) with TestNGSuiteLike with AkkaSubscriberVerificationLike { @@ -62,7 +62,7 @@ trait AkkaSubscriberVerificationLike { if (elements == Long.MaxValue) 1 to Int.MaxValue else 0 until elements.toInt - Flow(iterable).toPublisher() + Source(iterable).runWith(PublisherDrain()) } @AfterClass diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutProcessorTest.scala deleted file mode 100644 index 2d2c702c86..0000000000 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutProcessorTest.scala +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.tck - -import java.util.concurrent.atomic.AtomicInteger - -import akka.stream._ -import akka.stream.impl.ActorBasedFlowMaterializer -import akka.stream.impl.Ast -import org.reactivestreams.Processor -import org.reactivestreams.Publisher - -class FanoutProcessorTest extends AkkaIdentityProcessorVerification[Int] { - - val processorCounter = new AtomicInteger - - override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = { - val settings = MaterializerSettings(system) - .withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize) - - implicit val materializer = FlowMaterializer(settings)(system) - - val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet() - - val processor = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode( - Ast.FanoutBox(initialBufferSize = maxBufferSize / 2, maxBufferSize), flowName, 1) - - processor.asInstanceOf[Processor[Int, Int]] - } - - override def createHelperPublisher(elements: Long): Publisher[Int] = { - implicit val mat = FlowMaterializer()(system) - - createSimpleIntPublisher(elements)(mat) - } - - /** The Fanout Processor actually supports fanout */ - override def maxElementsFromPublisher = Long.MaxValue - -} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutPublisherTest.scala new file mode 100644 index 0000000000..d2db157d80 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FanoutPublisherTest.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import scala.collection.immutable +import akka.stream.scaladsl2.FanoutPublisherDrain +import akka.stream.scaladsl2.Source +import org.reactivestreams.Publisher + +class FanoutPublisherTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = { + val iterable: immutable.Iterable[Int] = + if (elements == 0) new immutable.Iterable[Int] { override def iterator = Iterator from 0 } + else 0 until elements.toInt + + Source(iterable).runWith(FanoutPublisherDrain(initialBufferSize = 2, maximumBufferSize = 4)) + } + +} diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala index d554532206..994ff60641 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala @@ -3,10 +3,10 @@ */ package akka.stream.tck -import akka.stream.scaladsl.Flow -import org.reactivestreams._ - import scala.collection.immutable +import akka.stream.scaladsl2.PublisherDrain +import akka.stream.scaladsl2.Source +import org.reactivestreams._ class IterablePublisherTest extends AkkaPublisherVerification[Int] { @@ -17,7 +17,7 @@ class IterablePublisherTest extends AkkaPublisherVerification[Int] { else 0 until elements.toInt - Flow(iterable).toPublisher() + Source(iterable).runWith(PublisherDrain()) } } \ No newline at end of file diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/IteratorPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/IteratorPublisherTest.scala index 23a6ab9ef5..2ff8e48a21 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/IteratorPublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/IteratorPublisherTest.scala @@ -3,10 +3,10 @@ */ package akka.stream.tck -import akka.stream.scaladsl.Flow -import org.reactivestreams.Publisher - import scala.collection.immutable +import akka.stream.scaladsl2.PublisherDrain +import akka.stream.scaladsl2.Source +import org.reactivestreams.Publisher class IteratorPublisherTest extends AkkaPublisherVerification[Int](true) { @@ -15,7 +15,7 @@ class IteratorPublisherTest extends AkkaPublisherVerification[Int](true) { if (elements == 0) new immutable.Iterable[Int] { override def iterator = Iterator from 0 } else 0 until elements.toInt - Flow(iterable).toPublisher() + Source(iterable).runWith(PublisherDrain()) } } \ No newline at end of file diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SimpleCallbackPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SimpleCallbackPublisherTest.scala index 5f8891d368..8d654638fd 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SimpleCallbackPublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SimpleCallbackPublisherTest.scala @@ -3,7 +3,8 @@ */ package akka.stream.tck -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2.PublisherDrain +import akka.stream.scaladsl2.Source import org.reactivestreams._ class SimpleCallbackPublisherTest extends AkkaPublisherVerification[Int] { @@ -11,7 +12,7 @@ class SimpleCallbackPublisherTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { val iter = Iterator from 0 val iter2 = if (elements > 0) iter take elements.toInt else iter - Flow(() ⇒ if (iter2.hasNext) Some(iter2.next()) else None).toPublisher() + Source(() ⇒ if (iter2.hasNext) Some(iter2.next()) else None).runWith(PublisherDrain()) } } \ No newline at end of file diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala index 45854094cc..bba51007cb 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala @@ -3,11 +3,12 @@ */ package akka.stream.tck +import akka.stream.MaterializerSettings +import akka.stream.Transformer +import akka.stream.impl2.ActorBasedFlowMaterializer +import akka.stream.impl2.Ast +import akka.stream.scaladsl2.FlowMaterializer import java.util.concurrent.atomic.AtomicInteger - -import akka.stream._ -import akka.stream.impl.ActorBasedFlowMaterializer -import akka.stream.impl.Ast import org.reactivestreams.Processor import org.reactivestreams.Publisher diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala index f1e4ddfd6e..877678d65b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala @@ -49,7 +49,7 @@ object FlowSpec { namePrefix: String, brokenMessage: Any) extends ActorBasedFlowMaterializer(settings, supervisor, flowNameCounter, namePrefix) { - override protected def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { + override def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { val props = op match { case t: Transform ⇒ Props(new BrokenTransformProcessorImpl(settings, t.mkTransformer(), brokenMessage)) case o ⇒ ActorProcessorFactory.props(this, o) @@ -556,20 +556,18 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece try { system.eventStream.publish(Mute(filters)) - EventFilter[akka.actor.PostRestartException](occurrences = 1) intercept { - upstream.expectRequest(upstreamSubscription, 1) - upstreamSubscription.sendNext("a3") - upstreamSubscription.expectCancellation() + upstream.expectRequest(upstreamSubscription, 1) + upstreamSubscription.sendNext("a3") + upstreamSubscription.expectCancellation() - // IllegalStateException terminated abruptly - checkError(downstream) - checkError(downstream2) + // IllegalStateException terminated abruptly + checkError(downstream) + checkError(downstream2) - val downstream3 = StreamTestKit.SubscriberProbe[Any]() - publisher.subscribe(downstream3) - // IllegalStateException terminated abruptly - checkError(downstream3) - } + val downstream3 = StreamTestKit.SubscriberProbe[Any]() + publisher.subscribe(downstream3) + // IllegalStateException terminated abruptly + checkError(downstream3) } finally { system.eventStream.publish(UnMute(filters)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index b5cf030242..1819081830 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -226,7 +226,6 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D if (downstreamDemand < 0) { // Long has overflown val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue) - subscriber.onError(demandOverflowException) cancel(demandOverflowException) } @@ -293,7 +292,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin override def postRestart(reason: Throwable): Unit = { super.postRestart(reason) - throw new IllegalStateException("This actor cannot be restarted") + throw new IllegalStateException("This actor cannot be restarted", reason) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index 4b4762dabc..44ab9c482b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -194,7 +194,10 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting override def onNext(element: Any) = List(element) }) - protected def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { + /** + * INTERNAL API + */ + private[akka] def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { val impl = actorOf(ActorProcessorFactory.props(this, op), s"$flowName-$n-${op.name}") ActorProcessorFactory(impl) } @@ -284,6 +287,8 @@ private[akka] object StreamSupervisor { private[akka] class StreamSupervisor(settings: MaterializerSettings) extends Actor { import StreamSupervisor._ + override def supervisorStrategy = SupervisorStrategy.stoppingStrategy + def receive = { case Materialize(props, name) ⇒ val impl = context.actorOf(props, name)