From 2c7fd89479f6ecb4fae0e946e1692723be79430e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 8 Aug 2014 16:35:53 +0200 Subject: [PATCH] +str #15408 Replace uncheckedVariance annotations in Flow/Duct --- .../main/scala/akka/stream/impl/FlowImpl.scala | 16 ++++++++-------- .../main/scala/akka/stream/javadsl/Duct.scala | 4 ---- .../main/scala/akka/stream/scaladsl/Duct.scala | 11 +++++------ .../main/scala/akka/stream/scaladsl/Flow.scala | 9 ++++----- .../src/test/scala/akka/stream/DuctSpec.scala | 16 ++++++++++++++++ .../src/test/scala/akka/stream/FlowSpec.scala | 18 +++++++++++++++++- 6 files changed, 50 insertions(+), 24 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index a25215164a..6f29f1ef56 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -48,7 +48,7 @@ private[akka] case class FlowImpl[I, O](publisherNode: Ast.PublisherNode[I], ops } override def consume(materializer: FlowMaterializer): Unit = - produceTo(new BlackholeSubscriber(materializer.settings.maximumInputBufferSize), materializer) + produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize), materializer) override def onComplete(callback: Try[Unit] ⇒ Unit, materializer: FlowMaterializer): Unit = transform(new Transformer[O, Unit] { @@ -63,7 +63,7 @@ private[akka] case class FlowImpl[I, O](publisherNode: Ast.PublisherNode[I], ops } }).consume(materializer) - override def toPublisher(materializer: FlowMaterializer): Publisher[O] = materializer.toPublisher(publisherNode, ops) + override def toPublisher[U >: O](materializer: FlowMaterializer): Publisher[U] = materializer.toPublisher(publisherNode, ops) override def produceTo(subscriber: Subscriber[_ >: O], materializer: FlowMaterializer): Unit = toPublisher(materializer).subscribe(subscriber.asInstanceOf[Subscriber[O]]) @@ -88,11 +88,11 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[ override def appendJava[U](duct: akka.stream.javadsl.Duct[_ >: Out, U]): Duct[In, U] = copy(ops = duct.ops ++: ops) - override def produceTo(subscriber: Subscriber[Out], materializer: FlowMaterializer): Subscriber[In] = + override def produceTo[U >: Out](subscriber: Subscriber[U], materializer: FlowMaterializer): Subscriber[In] = materializer.ductProduceTo(subscriber, ops) override def consume(materializer: FlowMaterializer): Subscriber[In] = - produceTo(new BlackholeSubscriber(materializer.settings.maximumInputBufferSize), materializer) + produceTo(new BlackholeSubscriber[Any](materializer.settings.maximumInputBufferSize), materializer) override def onComplete(callback: Try[Unit] ⇒ Unit, materializer: FlowMaterializer): Subscriber[In] = transform(new Transformer[Out, Unit] { @@ -107,7 +107,7 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[ } }).consume(materializer) - override def build(materializer: FlowMaterializer): (Subscriber[In], Publisher[Out]) = + override def build[U >: Out](materializer: FlowMaterializer): (Subscriber[In], Publisher[U]) = materializer.ductBuild(ops) override def foreach(c: Out ⇒ Unit, materializer: FlowMaterializer): (Subscriber[In], Future[Unit]) = { @@ -261,7 +261,7 @@ private[akka] trait Builder[Out] { override def name = "takeWithin" }) - def prefixAndTail(n: Int): Thing[(immutable.Seq[Out], Publisher[Out])] = andThen(PrefixAndTail(n)) + def prefixAndTail[U >: Out](n: Int): Thing[(immutable.Seq[Out], Publisher[U])] = andThen(PrefixAndTail(n)) def grouped(n: Int): Thing[immutable.Seq[Out]] = transform(new Transformer[Out, immutable.Seq[Out]] { @@ -319,9 +319,9 @@ private[akka] trait Builder[Out] { def merge[U >: Out](other: Publisher[_ <: U]): Thing[U] = andThen(Merge(other.asInstanceOf[Publisher[Any]])) - def splitWhen(p: (Out) ⇒ Boolean): Thing[Publisher[Out]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) + def splitWhen[U >: Out](p: (Out) ⇒ Boolean): Thing[Publisher[U]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) - def groupBy[K](f: (Out) ⇒ K): Thing[(K, Publisher[Out])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) + def groupBy[K, U >: Out](f: (Out) ⇒ K): Thing[(K, Publisher[U])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) def broadcast(other: Subscriber[_ >: Out]): Thing[Out] = andThen(Broadcast(other.asInstanceOf[Subscriber[Any]])) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala index 91dc864ae3..0d85591a37 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala @@ -378,10 +378,6 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, override def transform[U](transformer: Transformer[T, U]): Duct[In, U] = new DuctAdapter(delegate.transform(transformer)) - /** - * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element - * and a stream representing the remaining elements. - */ override def prefixAndTail(n: Int): Duct[In, Pair[java.util.List[T], Publisher[T]]] = new DuctAdapter(delegate.prefixAndTail(n).map { case (taken, tail) ⇒ Pair(taken.asJava, tail) }) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala index e5b2601770..6c7708fbc9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -3,7 +3,6 @@ */ package akka.stream.scaladsl -import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.util.Try import org.reactivestreams.{ Publisher, Subscriber } @@ -149,7 +148,7 @@ trait Duct[In, +Out] { * and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair * of an empty collection and a stream containing the whole upstream unchanged. */ - def prefixAndTail(n: Int): Duct[In, (immutable.Seq[Out], Publisher[Out @uncheckedVariance])] + def prefixAndTail[U >: Out](n: Int): Duct[In, (immutable.Seq[Out], Publisher[U])] /** * This operation demultiplexes the incoming stream into separate output @@ -162,7 +161,7 @@ trait Duct[In, +Out] { * care to unblock (or cancel) all of the produced streams even if you want * to consume only one of them. */ - def groupBy[K](f: Out ⇒ K): Duct[In, (K, Publisher[Out @uncheckedVariance])] + def groupBy[K, U >: Out](f: Out ⇒ K): Duct[In, (K, Publisher[U])] /** * This operation applies the given predicate to all incoming elements and @@ -177,7 +176,7 @@ trait Duct[In, +Out] { * true, false, false // elements go into third substream * }}} */ - def splitWhen(p: Out ⇒ Boolean): Duct[In, Publisher[Out @uncheckedVariance]] + def splitWhen[U >: Out](p: Out ⇒ Boolean): Duct[In, Publisher[U]] /** * Merge this stream with the one emitted by the given publisher, taking @@ -272,7 +271,7 @@ trait Duct[In, +Out] { * The given FlowMaterializer decides how the flow’s logical structure is * broken down into individual processing steps. */ - def produceTo(subscriber: Subscriber[Out] @uncheckedVariance, materializer: FlowMaterializer): Subscriber[In] + def produceTo[U >: Out](subscriber: Subscriber[U], materializer: FlowMaterializer): Subscriber[In] /** * Attaches a subscriber to this stream which will just discard all received @@ -308,7 +307,7 @@ trait Duct[In, +Out] { * The given FlowMaterializer decides how the flow’s logical structure is * broken down into individual processing steps. */ - def build(materializer: FlowMaterializer): (Subscriber[In], Publisher[Out] @uncheckedVariance) + def build[U >: Out](materializer: FlowMaterializer): (Subscriber[In], Publisher[U]) /** * Invoke the given procedure for each received element. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index e56b433ccd..68044f0b47 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -3,7 +3,6 @@ */ package akka.stream.scaladsl -import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.Future import scala.util.Try @@ -218,7 +217,7 @@ trait Flow[+T] { * and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair * of an empty collection and a stream containing the whole upstream unchanged. */ - def prefixAndTail(n: Int): Flow[(immutable.Seq[T], Publisher[T @uncheckedVariance])] + def prefixAndTail[U >: T](n: Int): Flow[(immutable.Seq[T], Publisher[U])] /** * This operation demultiplexes the incoming stream into separate output @@ -231,7 +230,7 @@ trait Flow[+T] { * care to unblock (or cancel) all of the produced streams even if you want * to consume only one of them. */ - def groupBy[K](f: T ⇒ K): Flow[(K, Publisher[T @uncheckedVariance])] + def groupBy[K, U >: T](f: T ⇒ K): Flow[(K, Publisher[U])] /** * This operation applies the given predicate to all incoming elements and @@ -246,7 +245,7 @@ trait Flow[+T] { * true, false, false // elements go into third substream * }}} */ - def splitWhen(p: T ⇒ Boolean): Flow[Publisher[T @uncheckedVariance]] + def splitWhen[U >: T](p: T ⇒ Boolean): Flow[Publisher[U]] /** * Merge this stream with the one emitted by the given publisher, taking @@ -372,7 +371,7 @@ trait Flow[+T] { * The given FlowMaterializer decides how the flow’s logical structure is * broken down into individual processing steps. */ - def toPublisher(materializer: FlowMaterializer): Publisher[T @uncheckedVariance] + def toPublisher[U >: T](materializer: FlowMaterializer): Publisher[U] /** * Attaches a subscriber to this stream. diff --git a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala index 15166c983d..8ed1071f00 100644 --- a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala @@ -3,6 +3,7 @@ */ package akka.stream +import scala.collection.immutable import scala.concurrent.duration._ import org.reactivestreams.{ Publisher, Subscriber } import akka.stream.scaladsl.Duct @@ -12,8 +13,14 @@ import akka.stream.testkit.StreamTestKit import scala.util.Success import scala.util.Failure +object DuctSpec { + class Fruit + class Apple extends Fruit +} + @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class DuctSpec extends AkkaSpec { + import DuctSpec._ val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) @@ -197,6 +204,15 @@ class DuctSpec extends AkkaSpec { c.expectComplete } + "be covariant" in { + val d1: Duct[String, Publisher[Fruit]] = Duct[String].map(_ ⇒ new Apple).splitWhen(_ ⇒ true) + val d2: Duct[String, (Boolean, Publisher[Fruit])] = Duct[String].map(_ ⇒ new Apple).groupBy(_ ⇒ true) + val d3: Duct[String, (immutable.Seq[Apple], Publisher[Fruit])] = Duct[String].map(_ ⇒ new Apple).prefixAndTail(1) + val s1: Subscriber[Fruit] = null + val s2: Subscriber[String] = Duct[String].map(_ ⇒ new Apple).produceTo(s1, materializer) + val t: Tuple2[Subscriber[String], Publisher[Fruit]] = Duct[String].map(_ ⇒ new Apple).build(materializer) + } + } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala index 00e403655c..501f5ee373 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala @@ -3,15 +3,22 @@ */ package akka.stream +import scala.collection.immutable import scala.concurrent.duration._ import akka.stream.testkit.{ AkkaSpec, ChainSetup, StreamTestKit } import akka.testkit._ import com.typesafe.config.ConfigFactory import akka.stream.scaladsl.Flow +import org.reactivestreams.Publisher + +object FlowSpec { + class Fruit + class Apple extends Fruit +} @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { - + import FlowSpec._ import system.dispatcher val settings = MaterializerSettings( @@ -20,6 +27,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece initialFanOutBufferSize = 1, maxFanOutBufferSize = 16, dispatcher = "akka.test.stream-dispatcher") + val mat = FlowMaterializer(settings) val identity: Flow[Any] ⇒ Flow[Any] = in ⇒ in.map(e ⇒ e) val identity2: Flow[Any] ⇒ Flow[Any] = in ⇒ identity(in) @@ -311,6 +319,14 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "if an internal error occurs subscribers' onError method should be called" in pending "if an internal error occurs future subscribers' onError should be called instead of onSubscribed" in pending + "be covariant" in { + val f1: Flow[Fruit] = Flow(() ⇒ new Apple) + val p1: Publisher[Fruit] = Flow(() ⇒ new Apple).toPublisher(mat) + val f2: Flow[Publisher[Fruit]] = Flow(() ⇒ new Apple).splitWhen(_ ⇒ true) + val f3: Flow[(Boolean, Publisher[Fruit])] = Flow(() ⇒ new Apple).groupBy(_ ⇒ true) + val f4: Flow[(immutable.Seq[Apple], Publisher[Fruit])] = Flow(() ⇒ new Apple).prefixAndTail(1) + } + } object TestException extends RuntimeException