diff --git a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala index 0ea082d6e2..32bc8a831f 100644 --- a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala @@ -5,14 +5,28 @@ package akka.stream import org.scalatest.testng.TestNGSuiteLike import org.reactivestreams.spi.Publisher -import org.reactivestreams.tck.PublisherVerification +import org.reactivestreams.tck.{ TestEnvironment, PublisherVerification } import akka.stream.impl.ActorBasedFlowMaterializer import org.reactivestreams.api.Producer import akka.stream.scaladsl.Flow +import akka.actor.ActorSystem +import akka.testkit.AkkaSpec -class ActorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { +class ActorProducerTest(_system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long) + extends PublisherVerification[Int](env, publisherShutdownTimeout) + with WithActorSystem with TestNGSuiteLike { + + implicit val system = _system import system.dispatcher + def this(system: ActorSystem) { + this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system)), Timeouts.publisherShutdownTimeoutMillis) + } + + def this() { + this(ActorSystem(classOf[ActorProducerTest].getSimpleName, AkkaSpec.testConf)) + } + private val materializer = FlowMaterializer(MaterializerSettings()) private def createProducer(elements: Int): Producer[Int] = { @@ -30,4 +44,5 @@ class ActorProducerTest extends PublisherVerification[Int] with WithActorSystem pub.getPublisher } + override def createErrorStatePublisher(): Publisher[Int] = null // ignore error-state tests } \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala index d70d14e8c9..fa18cc21bd 100644 --- a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala @@ -6,17 +6,29 @@ package akka.stream import org.scalatest.testng.TestNGSuiteLike import org.reactivestreams.spi.Publisher import org.reactivestreams.api.Processor -import org.reactivestreams.tck.IdentityProcessorVerification -import akka.actor.Props +import org.reactivestreams.tck.{ PublisherVerification, TestEnvironment, IdentityProcessorVerification } +import akka.actor.{ ActorSystem, Props } import akka.stream.impl.ActorProcessor import akka.stream.impl.TransformProcessorImpl import akka.stream.impl.Ast -import akka.testkit.TestEvent -import akka.testkit.EventFilter +import akka.testkit.{ AkkaSpec, TestEvent, EventFilter } import akka.stream.impl.ActorBasedFlowMaterializer import akka.stream.scaladsl.Flow -class IdentityProcessorTest extends IdentityProcessorVerification[Int] with WithActorSystem with TestNGSuiteLike { +class IdentityProcessorTest(_system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long) + extends IdentityProcessorVerification[Int](env, publisherShutdownTimeout) + with WithActorSystem with TestNGSuiteLike { + + implicit val system = _system + import system.dispatcher + + def this(system: ActorSystem) { + this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system)), Timeouts.publisherShutdownTimeoutMillis) + } + + def this() { + this(ActorSystem(classOf[IdentityProcessorTest].getSimpleName, AkkaSpec.testConf)) + } system.eventStream.publish(TestEvent.Mute(EventFilter[RuntimeException]("Test exception"))) @@ -44,4 +56,7 @@ class IdentityProcessorTest extends IdentityProcessorVerification[Int] with With Flow(if (elements > 0) iter take elements else iter).toProducer(materializer).getPublisher } + override def createErrorStatePublisher(): Publisher[Int] = null // ignore error-state tests + + override def createCompletedStatePublisher(): Publisher[Int] = null // ignore completed-state tests } diff --git a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala index 8e4414fd17..2ce51f4fb8 100644 --- a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala @@ -5,11 +5,25 @@ package akka.stream import org.scalatest.testng.TestNGSuiteLike import org.reactivestreams.spi.Publisher -import org.reactivestreams.tck.PublisherVerification +import org.reactivestreams.tck.{ TestEnvironment, PublisherVerification } import scala.collection.immutable import akka.stream.scaladsl.Flow +import akka.actor.ActorSystem +import akka.testkit.AkkaSpec -class IterableProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { +class IterableProducerTest(_system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long) + extends PublisherVerification[Int](env, publisherShutdownTimeout) + with WithActorSystem with TestNGSuiteLike { + + implicit val system = _system + + def this(system: ActorSystem) { + this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system)), Timeouts.publisherShutdownTimeoutMillis) + } + + def this() { + this(ActorSystem(classOf[IterableProducerTest].getSimpleName, AkkaSpec.testConf)) + } val materializer = FlowMaterializer(MaterializerSettings( maximumInputBufferSize = 512))(system) @@ -26,4 +40,5 @@ class IterableProducerTest extends PublisherVerification[Int] with WithActorSyst override def createCompletedStatePublisher(): Publisher[Int] = Flow[Int](Nil).toProducer(materializer).getPublisher + override def createErrorStatePublisher(): Publisher[Int] = null // ignore error-state tests } \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala index 372486df51..61ef3dd045 100644 --- a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala @@ -5,10 +5,24 @@ package akka.stream import org.scalatest.testng.TestNGSuiteLike import org.reactivestreams.spi.Publisher -import org.reactivestreams.tck.PublisherVerification +import org.reactivestreams.tck.{ TestEnvironment, PublisherVerification } import akka.stream.scaladsl.Flow +import akka.actor.ActorSystem +import akka.testkit.AkkaSpec -class IteratorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { +class IteratorProducerTest(_system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long) + extends PublisherVerification[Int](env, publisherShutdownTimeout) + with WithActorSystem with TestNGSuiteLike { + + implicit val system = _system + + def this(system: ActorSystem) { + this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system)), Timeouts.publisherShutdownTimeoutMillis) + } + + def this() { + this(ActorSystem(classOf[IteratorProducerTest].getSimpleName, AkkaSpec.testConf)) + } val materializer = FlowMaterializer(MaterializerSettings( maximumInputBufferSize = 512))(system) @@ -25,4 +39,5 @@ class IteratorProducerTest extends PublisherVerification[Int] with WithActorSyst override def createCompletedStatePublisher(): Publisher[Int] = Flow(List.empty[Int].iterator).toProducer(materializer).getPublisher + override def createErrorStatePublisher(): Publisher[Int] = null // ignore error-state tests } \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/Timeouts.scala b/akka-stream/src/test/scala/akka/stream/Timeouts.scala index 6761dc2885..4656a663f6 100644 --- a/akka-stream/src/test/scala/akka/stream/Timeouts.scala +++ b/akka-stream/src/test/scala/akka/stream/Timeouts.scala @@ -10,13 +10,11 @@ import akka.testkit._ /** * Specifies timeouts for the TCK */ -trait Timeouts { +object Timeouts { def publisherShutdownTimeoutMillis: Int = 1000 - def defaultTimeoutMillis: Int = + def defaultTimeoutMillis(implicit system: ActorSystem): Int = 500.millis.dilated(system).toMillis.toInt - def system: ActorSystem - } diff --git a/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala b/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala index f36b237a0a..976dd6b1eb 100644 --- a/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala +++ b/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala @@ -7,8 +7,8 @@ import akka.actor.ActorSystem import org.testng.annotations.AfterClass import akka.testkit.AkkaSpec -trait WithActorSystem extends Timeouts { - implicit val system: ActorSystem = ActorSystem(getClass.getSimpleName, AkkaSpec.testConf) +trait WithActorSystem { + def system: ActorSystem @AfterClass def shutdownActorSystem(): Unit = system.shutdown()