diff --git a/akka-stream/src/main/scala/akka/stream/Stream.scala b/akka-stream/src/main/scala/akka/stream/Stream.scala index bb51a7a857..f48568b933 100644 --- a/akka-stream/src/main/scala/akka/stream/Stream.scala +++ b/akka-stream/src/main/scala/akka/stream/Stream.scala @@ -16,6 +16,7 @@ import scala.util.control.NoStackTrace import akka.stream.impl.Ast.IteratorProducerNode import akka.stream.impl.Ast.IterableProducerNode import akka.stream.impl.Ast.ExistingProducer +import scala.annotation.unchecked.uncheckedVariance object Stream { def apply[T](producer: Producer[T]): Stream[T] = StreamImpl(ExistingProducer(producer), Nil) @@ -27,7 +28,7 @@ object Stream { object Stop extends RuntimeException with NoStackTrace } -trait Stream[T] { +trait Stream[+T] { def map[U](f: T ⇒ U): Stream[U] def filter(p: T ⇒ Boolean): Stream[T] def foreach(c: T ⇒ Unit): Stream[Unit] @@ -45,16 +46,16 @@ trait Stream[T] { onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U] - def groupBy[K](f: T ⇒ K): Stream[(K, Producer[T])] - def splitWhen(p: T ⇒ Boolean): Stream[Producer[T]] + def groupBy[K](f: T ⇒ K): Stream[(K, Producer[T @uncheckedVariance])] + def splitWhen(p: T ⇒ Boolean): Stream[Producer[T @uncheckedVariance]] - def merge(other: Producer[T]): Stream[T] + def merge[U >: T](other: Producer[U]): Stream[U] def zip[U](other: Producer[U]): Stream[(T, U)] - def concat(next: Producer[T]): Stream[T] + def concat[U >: T](next: Producer[U]): Stream[U] def toFuture(generator: ProcessorGenerator): Future[T] def consume(generator: ProcessorGenerator): Unit - def toProducer(generator: ProcessorGenerator): Producer[T] + def toProducer(generator: ProcessorGenerator): Producer[T @uncheckedVariance] } // FIXME is Processor the right naming here? diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala index f5d8f0c7b1..cf5ef50082 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala @@ -63,9 +63,9 @@ private[akka] case class StreamImpl[I, O](producerNode: Ast.ProducerNode[I], ops override def zip[O2](other: Producer[O2]): Stream[(O, O2)] = andThen(Zip(other.asInstanceOf[Producer[Any]])) - override def concat(next: Producer[O]): Stream[O] = andThen(Concat(next.asInstanceOf[Producer[Any]])) + override def concat[U >: O](next: Producer[U]): Stream[U] = andThen(Concat(next.asInstanceOf[Producer[Any]])) - override def merge(other: Producer[O]): Stream[O] = andThen(Merge(other.asInstanceOf[Producer[Any]])) + override def merge[U >: O](other: Producer[U]): Stream[U] = andThen(Merge(other.asInstanceOf[Producer[Any]])) override def splitWhen(p: (O) ⇒ Boolean): Stream[Producer[O]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) diff --git a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala index 4aae17bdbe..2ae302ca95 100644 --- a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala @@ -23,7 +23,7 @@ class IterableProducerTest extends PublisherVerification[Int] with WithActorSyst } override def createCompletedStatePublisher(): Publisher[Int] = - Stream(Nil).toProducer(gen).getPublisher + Stream[Int](Nil).toProducer(gen).getPublisher override def publisherShutdownTimeoutMillis: Int = 1000 } \ No newline at end of file