=str #16040 Use scaladsl2 in TCK tests

* found and fixed one issue: onError was invoked twice for the overflow
  error
* and also noticed that we restart the actors, instead of stopping them
  in case of internal error, changed to stopping strategy
This commit is contained in:
Patrik Nordwall 2014-10-06 14:04:05 +02:00
parent afd45a09f4
commit 4f362f91df
12 changed files with 70 additions and 84 deletions

View file

@ -3,9 +3,11 @@
*/
package akka.stream.tck
import scala.collection.immutable
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl2.FlowMaterializer
import akka.stream.scaladsl2.PublisherDrain
import akka.stream.scaladsl2.Source
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.testkit.EventFilter
@ -15,8 +17,6 @@ import org.reactivestreams.tck.IdentityProcessorVerification
import org.reactivestreams.tck.TestEnvironment
import org.scalatest.testng.TestNGSuiteLike
import scala.collection.immutable
abstract class AkkaIdentityProcessorVerification[T](val system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long)
extends IdentityProcessorVerification[T](env, publisherShutdownTimeout)
with TestNGSuiteLike {
@ -48,7 +48,7 @@ abstract class AkkaIdentityProcessorVerification[T](val system: ActorSystem, env
if (elements == Long.MaxValue) 1 to Int.MaxValue
else 0 until elements.toInt
Flow(iterable).toPublisher()
Source(iterable).runWith(PublisherDrain())
}
/** By default Akka Publishers do not support Fanout! */

View file

@ -3,15 +3,17 @@
*/
package akka.stream.tck
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.MaterializerSettings
import akka.stream.scaladsl2.FlowMaterializer
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import org.reactivestreams.Publisher
import org.reactivestreams.tck.{ PublisherVerification, TestEnvironment }
import org.scalatest.testng.TestNGSuiteLike
import org.testng.annotations.AfterClass
import scala.concurrent.duration._
abstract class AkkaPublisherVerification[T](val system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long)
extends PublisherVerification[T](env, publisherShutdownTimeout)

View file

@ -3,10 +3,13 @@
*/
package akka.stream.tck
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.MaterializerSettings
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl2.FlowMaterializer
import akka.stream.scaladsl2.PublisherDrain
import akka.stream.scaladsl2.Source
import akka.stream.testkit.AkkaSpec
import org.reactivestreams.Publisher
import org.reactivestreams.tck.SubscriberBlackboxVerification
@ -15,9 +18,6 @@ import org.reactivestreams.tck.TestEnvironment
import org.scalatest.testng.TestNGSuiteLike
import org.testng.annotations.AfterClass
import scala.collection.immutable
import scala.concurrent.duration._
abstract class AkkaSubscriberBlackboxVerification[T](val system: ActorSystem, env: TestEnvironment)
extends SubscriberBlackboxVerification[T](env) with TestNGSuiteLike
with AkkaSubscriberVerificationLike {
@ -62,7 +62,7 @@ trait AkkaSubscriberVerificationLike {
if (elements == Long.MaxValue) 1 to Int.MaxValue
else 0 until elements.toInt
Flow(iterable).toPublisher()
Source(iterable).runWith(PublisherDrain())
}
@AfterClass

View file

@ -1,41 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import java.util.concurrent.atomic.AtomicInteger
import akka.stream._
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.impl.Ast
import org.reactivestreams.Processor
import org.reactivestreams.Publisher
class FanoutProcessorTest extends AkkaIdentityProcessorVerification[Int] {
val processorCounter = new AtomicInteger
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize)
implicit val materializer = FlowMaterializer(settings)(system)
val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet()
val processor = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode(
Ast.FanoutBox(initialBufferSize = maxBufferSize / 2, maxBufferSize), flowName, 1)
processor.asInstanceOf[Processor[Int, Int]]
}
override def createHelperPublisher(elements: Long): Publisher[Int] = {
implicit val mat = FlowMaterializer()(system)
createSimpleIntPublisher(elements)(mat)
}
/** The Fanout Processor actually supports fanout */
override def maxElementsFromPublisher = Long.MaxValue
}

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import scala.collection.immutable
import akka.stream.scaladsl2.FanoutPublisherDrain
import akka.stream.scaladsl2.Source
import org.reactivestreams.Publisher
class FanoutPublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == 0) new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
else 0 until elements.toInt
Source(iterable).runWith(FanoutPublisherDrain(initialBufferSize = 2, maximumBufferSize = 4))
}
}

View file

@ -3,10 +3,10 @@
*/
package akka.stream.tck
import akka.stream.scaladsl.Flow
import org.reactivestreams._
import scala.collection.immutable
import akka.stream.scaladsl2.PublisherDrain
import akka.stream.scaladsl2.Source
import org.reactivestreams._
class IterablePublisherTest extends AkkaPublisherVerification[Int] {
@ -17,7 +17,7 @@ class IterablePublisherTest extends AkkaPublisherVerification[Int] {
else
0 until elements.toInt
Flow(iterable).toPublisher()
Source(iterable).runWith(PublisherDrain())
}
}

View file

@ -3,10 +3,10 @@
*/
package akka.stream.tck
import akka.stream.scaladsl.Flow
import org.reactivestreams.Publisher
import scala.collection.immutable
import akka.stream.scaladsl2.PublisherDrain
import akka.stream.scaladsl2.Source
import org.reactivestreams.Publisher
class IteratorPublisherTest extends AkkaPublisherVerification[Int](true) {
@ -15,7 +15,7 @@ class IteratorPublisherTest extends AkkaPublisherVerification[Int](true) {
if (elements == 0) new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
else 0 until elements.toInt
Flow(iterable).toPublisher()
Source(iterable).runWith(PublisherDrain())
}
}

View file

@ -3,7 +3,8 @@
*/
package akka.stream.tck
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl2.PublisherDrain
import akka.stream.scaladsl2.Source
import org.reactivestreams._
class SimpleCallbackPublisherTest extends AkkaPublisherVerification[Int] {
@ -11,7 +12,7 @@ class SimpleCallbackPublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val iter = Iterator from 0
val iter2 = if (elements > 0) iter take elements.toInt else iter
Flow(() if (iter2.hasNext) Some(iter2.next()) else None).toPublisher()
Source(() if (iter2.hasNext) Some(iter2.next()) else None).runWith(PublisherDrain())
}
}

View file

@ -3,11 +3,12 @@
*/
package akka.stream.tck
import akka.stream.MaterializerSettings
import akka.stream.Transformer
import akka.stream.impl2.ActorBasedFlowMaterializer
import akka.stream.impl2.Ast
import akka.stream.scaladsl2.FlowMaterializer
import java.util.concurrent.atomic.AtomicInteger
import akka.stream._
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.impl.Ast
import org.reactivestreams.Processor
import org.reactivestreams.Publisher

View file

@ -49,7 +49,7 @@ object FlowSpec {
namePrefix: String,
brokenMessage: Any) extends ActorBasedFlowMaterializer(settings, supervisor, flowNameCounter, namePrefix) {
override protected def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
override def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
val props = op match {
case t: Transform Props(new BrokenTransformProcessorImpl(settings, t.mkTransformer(), brokenMessage))
case o ActorProcessorFactory.props(this, o)
@ -556,20 +556,18 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
try {
system.eventStream.publish(Mute(filters))
EventFilter[akka.actor.PostRestartException](occurrences = 1) intercept {
upstream.expectRequest(upstreamSubscription, 1)
upstreamSubscription.sendNext("a3")
upstreamSubscription.expectCancellation()
upstream.expectRequest(upstreamSubscription, 1)
upstreamSubscription.sendNext("a3")
upstreamSubscription.expectCancellation()
// IllegalStateException terminated abruptly
checkError(downstream)
checkError(downstream2)
// IllegalStateException terminated abruptly
checkError(downstream)
checkError(downstream2)
val downstream3 = StreamTestKit.SubscriberProbe[Any]()
publisher.subscribe(downstream3)
// IllegalStateException terminated abruptly
checkError(downstream3)
}
val downstream3 = StreamTestKit.SubscriberProbe[Any]()
publisher.subscribe(downstream3)
// IllegalStateException terminated abruptly
checkError(downstream3)
} finally {
system.eventStream.publish(UnMute(filters))
}

View file

@ -226,7 +226,6 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
if (downstreamDemand < 0) {
// Long has overflown
val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue)
subscriber.onError(demandOverflowException)
cancel(demandOverflowException)
}
@ -293,7 +292,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
throw new IllegalStateException("This actor cannot be restarted")
throw new IllegalStateException("This actor cannot be restarted", reason)
}
}

View file

@ -194,7 +194,10 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
override def onNext(element: Any) = List(element)
})
protected def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
/**
* INTERNAL API
*/
private[akka] def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
val impl = actorOf(ActorProcessorFactory.props(this, op), s"$flowName-$n-${op.name}")
ActorProcessorFactory(impl)
}
@ -284,6 +287,8 @@ private[akka] object StreamSupervisor {
private[akka] class StreamSupervisor(settings: MaterializerSettings) extends Actor {
import StreamSupervisor._
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
def receive = {
case Materialize(props, name)
val impl = context.actorOf(props, name)