!str make Stream[+T] covariant
This commit is contained in:
parent
ac1c6883c9
commit
c61958906f
3 changed files with 10 additions and 9 deletions
|
|
@ -16,6 +16,7 @@ import scala.util.control.NoStackTrace
|
|||
import akka.stream.impl.Ast.IteratorProducerNode
|
||||
import akka.stream.impl.Ast.IterableProducerNode
|
||||
import akka.stream.impl.Ast.ExistingProducer
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
|
||||
object Stream {
|
||||
def apply[T](producer: Producer[T]): Stream[T] = StreamImpl(ExistingProducer(producer), Nil)
|
||||
|
|
@ -27,7 +28,7 @@ object Stream {
|
|||
object Stop extends RuntimeException with NoStackTrace
|
||||
}
|
||||
|
||||
trait Stream[T] {
|
||||
trait Stream[+T] {
|
||||
def map[U](f: T ⇒ U): Stream[U]
|
||||
def filter(p: T ⇒ Boolean): Stream[T]
|
||||
def foreach(c: T ⇒ Unit): Stream[Unit]
|
||||
|
|
@ -45,16 +46,16 @@ trait Stream[T] {
|
|||
onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil,
|
||||
isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U]
|
||||
|
||||
def groupBy[K](f: T ⇒ K): Stream[(K, Producer[T])]
|
||||
def splitWhen(p: T ⇒ Boolean): Stream[Producer[T]]
|
||||
def groupBy[K](f: T ⇒ K): Stream[(K, Producer[T @uncheckedVariance])]
|
||||
def splitWhen(p: T ⇒ Boolean): Stream[Producer[T @uncheckedVariance]]
|
||||
|
||||
def merge(other: Producer[T]): Stream[T]
|
||||
def merge[U >: T](other: Producer[U]): Stream[U]
|
||||
def zip[U](other: Producer[U]): Stream[(T, U)]
|
||||
def concat(next: Producer[T]): Stream[T]
|
||||
def concat[U >: T](next: Producer[U]): Stream[U]
|
||||
|
||||
def toFuture(generator: ProcessorGenerator): Future[T]
|
||||
def consume(generator: ProcessorGenerator): Unit
|
||||
def toProducer(generator: ProcessorGenerator): Producer[T]
|
||||
def toProducer(generator: ProcessorGenerator): Producer[T @uncheckedVariance]
|
||||
}
|
||||
|
||||
// FIXME is Processor the right naming here?
|
||||
|
|
|
|||
|
|
@ -63,9 +63,9 @@ private[akka] case class StreamImpl[I, O](producerNode: Ast.ProducerNode[I], ops
|
|||
|
||||
override def zip[O2](other: Producer[O2]): Stream[(O, O2)] = andThen(Zip(other.asInstanceOf[Producer[Any]]))
|
||||
|
||||
override def concat(next: Producer[O]): Stream[O] = andThen(Concat(next.asInstanceOf[Producer[Any]]))
|
||||
override def concat[U >: O](next: Producer[U]): Stream[U] = andThen(Concat(next.asInstanceOf[Producer[Any]]))
|
||||
|
||||
override def merge(other: Producer[O]): Stream[O] = andThen(Merge(other.asInstanceOf[Producer[Any]]))
|
||||
override def merge[U >: O](other: Producer[U]): Stream[U] = andThen(Merge(other.asInstanceOf[Producer[Any]]))
|
||||
|
||||
override def splitWhen(p: (O) ⇒ Boolean): Stream[Producer[O]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean]))
|
||||
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ class IterableProducerTest extends PublisherVerification[Int] with WithActorSyst
|
|||
}
|
||||
|
||||
override def createCompletedStatePublisher(): Publisher[Int] =
|
||||
Stream(Nil).toProducer(gen).getPublisher
|
||||
Stream[Int](Nil).toProducer(gen).getPublisher
|
||||
|
||||
override def publisherShutdownTimeoutMillis: Int = 1000
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue