=str #15755 #15756 correct variance annotations

- make HasNoX into pure marker traits
- make materialization type-safe
- remove casting when using EmptyPublisher
This commit is contained in:
Roland Kuhn 2014-09-01 21:17:23 +02:00
parent 0046bebdfe
commit 5ba32ccc50
7 changed files with 97 additions and 121 deletions

View file

@ -76,13 +76,13 @@ private[akka] object Ast {
final case class IteratorPublisherNode[I](iterator: Iterator[I]) extends PublisherNode[I] {
final def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
if (iterator.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]]
if (iterator.isEmpty) EmptyPublisher[I]
else ActorPublisher[I](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings),
name = s"$flowName-0-iterator"))
}
final case class IterablePublisherNode[I](iterable: immutable.Iterable[I]) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]]
if (iterable.isEmpty) EmptyPublisher[I]
else ActorPublisher[I](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings),
name = s"$flowName-0-iterable"), Some(iterable))
}

View file

@ -10,6 +10,7 @@ import org.reactivestreams.{ Subscriber, Publisher }
*/
private[akka] case object EmptyPublisher extends Publisher[Nothing] {
def subscribe(subscriber: Subscriber[Nothing]): Unit = subscriber.onComplete()
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]
}
/**
@ -17,4 +18,5 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] {
*/
private[akka] case class ErrorPublisher(t: Throwable) extends Publisher[Nothing] {
def subscribe(subscriber: Subscriber[Nothing]): Unit = subscriber.onError(t)
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]
}

View file

@ -14,7 +14,7 @@ import scala.util.control.NonFatal
*/
private[akka] object SynchronousPublisherFromIterable {
def apply[T](iterable: immutable.Iterable[T]): Publisher[T] =
if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[T]]
if (iterable.isEmpty) EmptyPublisher[T]
else new SynchronousPublisherFromIterable(iterable)
private class IteratorSubscription[T](subscriber: Subscriber[T], iterator: Iterator[T]) extends Subscription {

View file

@ -15,7 +15,6 @@ import scala.util.{ Failure, Success }
import akka.stream.Transformer
import akka.stream.scaladsl2.FlowMaterializer
import akka.stream.MaterializerSettings
import akka.stream.impl.EmptyPublisher
import akka.stream.impl.ActorPublisher
import akka.stream.impl.IterablePublisher
import akka.stream.impl.TransformProcessorImpl

View file

@ -6,9 +6,7 @@ package akka.stream.scaladsl2
import scala.language.higherKinds
import scala.collection.immutable
import scala.concurrent.Future
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import akka.stream.Transformer
import akka.stream._
import akka.stream.impl.BlackholeSubscriber
import akka.stream.impl2.Ast._
import scala.annotation.unchecked.uncheckedVariance
@ -17,6 +15,7 @@ import scala.concurrent.Promise
import akka.stream.impl.EmptyPublisher
import akka.stream.impl.IterablePublisher
import akka.stream.impl2.ActorBasedFlowMaterializer
import org.reactivestreams._
sealed trait Flow
@ -39,42 +38,45 @@ object FlowFrom {
def apply[T](p: Publisher[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(PublisherSource(p))
}
trait Source[-In] {
def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) @uncheckedVariance
trait Source[+In] {
def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In @uncheckedVariance], Any)
}
trait SourceKey[+In, T] extends Source[In] {
override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In @uncheckedVariance], T)
// these are unique keys, case class equality would break them
override def equals(other: AnyRef): Boolean = this eq other
}
/**
* Default input.
* Allows to materialize a Flow with this input to Subscriber.
*/
final case class SubscriberSource[In]() extends Source[In] {
override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = {
final case class SubscriberSource[In]() extends SourceKey[In, Subscriber[In]] {
override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], Subscriber[In]) = {
val identityProcessor = materializer.identityProcessor[In](flowName)
(identityProcessor.asInstanceOf[Publisher[In]], identityProcessor.asInstanceOf[Subscriber[In]])
(identityProcessor, identityProcessor)
}
def subscriber[I <: In](m: MaterializedSource): Subscriber[I] =
m.getSourceFor(this).asInstanceOf[Subscriber[I]]
def subscriber(m: MaterializedSource): Subscriber[In] =
m.getSourceFor(this)
}
/**
* [[Source]] from `Publisher`.
*/
final case class PublisherSource[In](p: Publisher[_ >: In]) extends Source[In] {
final case class PublisherSource[In](p: Publisher[In]) extends Source[In] {
override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) =
(p.asInstanceOf[Publisher[In]], p)
(p, p)
}
/**
* [[Source]] from `Iterable`
*
* Changing In from Contravariant to Covariant is needed because Iterable[+A].
* But this brakes IterableSource variance and we get IterableSource(Seq(1,2,3)): IterableSource[Any]
*/
final case class IterableSource[In](iterable: immutable.Iterable[_ >: In]) extends Source[In] {
final case class IterableSource[In](iterable: immutable.Iterable[In]) extends Source[In] {
override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = {
val p =
if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[In]]
val p: Publisher[In] =
if (iterable.isEmpty) EmptyPublisher[In]
else materializer match {
case m: ActorBasedFlowMaterializer
m.actorPublisher(IterablePublisher.props(iterable, materializer.settings),
@ -82,36 +84,43 @@ final case class IterableSource[In](iterable: immutable.Iterable[_ >: In]) exten
case other
throw new IllegalArgumentException(s"IterableSource requires ActorBasedFlowMaterializer, got [${other.getClass.getName}]")
}
(p.asInstanceOf[Publisher[In]], iterable)
(p, iterable)
}
}
/**
* [[Source]] from `Future`
*
* Changing In from Contravariant to Covariant is needed because Future[+A].
* But this brakes FutureSource variance and we get FutureSource(Future{1}): FutureSource[Any]
*/
final case class FutureSource[In](f: Future[_ >: In]) extends Source[In] {
final case class FutureSource[In](f: Future[In]) extends Source[In] {
override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = ???
}
trait Sink[+Out] {
def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef
trait Sink[-Out] {
def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: FlowMaterializer): Any
}
trait SinkKey[-Out, T] extends Sink[Out] {
override def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: FlowMaterializer): T
// these are unique keys, case class equality would break them
override def equals(other: AnyRef): Boolean = this eq other
}
/**
* Default output.
* Allows to materialize a Flow with this output to Publisher.
*/
final case class PublisherSink[+Out]() extends Sink[Out] {
def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = flowPublisher
def publisher[O >: Out](m: MaterializedSink): Publisher[O] = m.getSinkFor(this).asInstanceOf[Publisher[O]]
// FIXME: make case object
final case class PublisherSink[Out]() extends SinkKey[Out, Publisher[Out]] {
def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): Publisher[Out] = flowPublisher
def publisher(m: MaterializedSink): Publisher[Out] = m.getSinkFor(this)
}
final case class BlackholeSink[+Out]() extends Sink[Out] {
override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = {
val s = new BlackholeSubscriber[Out](materializer.settings.maxInputBufferSize)
/**
* Output to nirvana.
*/
final case object BlackholeSink extends Sink[Any] {
override def attach(flowPublisher: Publisher[Any], materializer: FlowMaterializer): AnyRef = {
val s = new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize)
flowPublisher.subscribe(s)
s
}
@ -120,26 +129,19 @@ final case class BlackholeSink[+Out]() extends Sink[Out] {
/**
* [[Sink]] to a Subscriber.
*/
final case class SubscriberSink[+Out](subscriber: Subscriber[_ <: Out]) extends Sink[Out] {
override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = {
flowPublisher.subscribe(subscriber.asInstanceOf[Subscriber[Out]])
final case class SubscriberSink[Out](subscriber: Subscriber[Out]) extends Sink[Out] {
override def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): AnyRef = {
flowPublisher.subscribe(subscriber)
subscriber
}
}
/**
* INTERNAL API
*/
private[akka] object ForeachSink {
private val ListOfUnit = List(())
}
/**
* Foreach output. Invokes the given function for each element. Completes the [[#future]] when
* all elements processed, or stream failed.
*/
final case class ForeachSink[Out](f: Out Unit) extends Sink[Out] { // FIXME variance?
override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = {
final case class ForeachSink[Out](f: Out Unit) extends SinkKey[Out, Future[Unit]] {
override def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): Future[Unit] = {
val promise = Promise[Unit]()
FlowFrom(flowPublisher).transform("foreach", () new Transformer[Out, Unit] {
override def onNext(in: Out) = { f(in); Nil }
@ -153,49 +155,36 @@ final case class ForeachSink[Out](f: Out ⇒ Unit) extends Sink[Out] { // FIXME
}).consume()(materializer)
promise.future
}
def future(m: MaterializedSink): Future[Unit] = m.getSinkFor(this).asInstanceOf[Future[Unit]]
def future(m: MaterializedSink): Future[Unit] = m.getSinkFor(this)
}
/**
* Fold output. Reduces output stream according to the given fold function.
*/
final case class FoldSink[T, +Out](zero: T)(f: (T, Out) T) extends Sink[Out] {
override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = ???
final case class FoldSink[T, Out](zero: T)(f: (T, Out) T) extends Sink[Out] {
override def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): AnyRef = ???
def future: Future[T] = ???
}
/**
* Operations with a Flow which has no attached [[Source]].
*
* No Out type parameter would be useful for Graph signatures, but we need it here
* for `withSource` and `prependTransform` methods.
* Marker interface for flows that have a free (attachable) input side.
*/
sealed trait HasNoSource[-In, +Out] extends Flow {
type Repr[-In, +Out] <: HasNoSource[In, Out]
type AfterAttachingSource[-In, +Out] <: Flow
def withSource[I <: In](in: Source[I]): AfterAttachingSource[I, Out]
def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out]
def prepend[T](f: FlowWithSource[T, In]): Repr[T, Out]#AfterAttachingSource[T, Out]
}
sealed trait HasNoSource[-In] extends Flow
/**
* Operations with a Flow which has no attached [[Sink]].
*
* No In type parameter would be useful for Graph signatures, but we need it here
* for `withSink`.
* Marker interface for flows that have a free (attachable) output side.
*/
trait HasNoSink[-In, +Out] extends Flow {
type Repr[-In, +Out] <: HasNoSink[In, Out]
type AfterAttachingSink[-In, +Out] <: Flow
sealed trait HasNoSink[+Out] extends Flow
/**
* Operations offered by flows with a free output side: the DSL flows left-to-right only.
*/
trait FlowOps[-In, +Out] extends HasNoSink[Out] {
type Repr[-I, +O] <: FlowOps[I, O]
// Storing ops in reverse order
protected def andThen[U](op: AstNode): Repr[In, U]
def withSink[O >: Out](out: Sink[O]): AfterAttachingSink[In, O]
def map[T](f: Out T): Repr[In, T] =
transform("map", () new Transformer[Out, T] {
override def onNext(in: Out) = List(f(in))
@ -204,52 +193,39 @@ trait HasNoSink[-In, +Out] extends Flow {
def transform[T](name: String, mkTransformer: () Transformer[Out, T]): Repr[In, T] = {
andThen(Transform(name, mkTransformer.asInstanceOf[() Transformer[Any, Any]]))
}
def append[T](f: ProcessorFlow[Out, T]): Repr[In, T]
def append[T](f: FlowWithSink[Out, T]): Repr[In, T]#AfterAttachingSink[In, T]
}
/**
* Flow without attached input and without attached output, can be used as a `Processor`.
*/
final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends HasNoSink[In, Out] with HasNoSource[In, Out] {
override type Repr[-In, +Out] = ProcessorFlow[In, Out]
type AfterAttachingSink[-In, +Out] = FlowWithSink[In, Out]
type AfterAttachingSource[-In, +Out] = FlowWithSource[In, Out]
final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends FlowOps[In, Out] with HasNoSource[In] {
override type Repr[-I, +O] = ProcessorFlow[I, O]
override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops)
override def withSink[O >: Out](out: Sink[O]): AfterAttachingSink[In, O] = FlowWithSink(out, ops)
override def withSource[I <: In](in: Source[I]): AfterAttachingSource[I, Out] = FlowWithSource(in, ops)
def withSink(out: Sink[Out]): FlowWithSink[In, Out] = FlowWithSink(out, ops)
def withSource(in: Source[In]): FlowWithSource[In, Out] = FlowWithSource(in, ops)
override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] =
ProcessorFlow(ops ::: f.ops)
override def prepend[T](f: FlowWithSource[T, In]): Repr[T, Out]#AfterAttachingSource[T, Out] =
FlowWithSource(f.input, ops ::: f.ops)
def prepend[T](f: ProcessorFlow[T, In]): ProcessorFlow[T, Out] = ProcessorFlow(ops ::: f.ops)
def prepend[T](f: FlowWithSource[T, In]): FlowWithSource[T, Out] = f.append(this)
override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = ProcessorFlow(f.ops ++: ops)
override def append[T](f: FlowWithSink[Out, T]): Repr[In, T]#AfterAttachingSink[In, T] =
FlowWithSink(f.output, f.ops ++: ops)
def append[T](f: ProcessorFlow[Out, T]): ProcessorFlow[In, T] = ProcessorFlow(f.ops ++: ops)
def append[T](f: FlowWithSink[Out, T]): FlowWithSink[In, T] = f.prepend(this)
}
/**
* Flow with attached output, can be used as a `Subscriber`.
*/
final case class FlowWithSink[-In, +Out](output: Sink[Out], ops: List[AstNode]) extends HasNoSource[In, Out] {
type Repr[-In, +Out] = FlowWithSink[In, Out]
type AfterAttachingSource[-In, +Out] = RunnableFlow[In, Out]
final case class FlowWithSink[-In, +Out](private[scaladsl2] val output: Sink[Out @uncheckedVariance], ops: List[AstNode]) extends HasNoSource[In] {
override def withSource[I <: In](in: Source[I]): AfterAttachingSource[I, Out] = RunnableFlow(in, output, ops)
def withSource(in: Source[In]): RunnableFlow[In, Out] = new RunnableFlow(in, output, ops)
def withoutSink: ProcessorFlow[In, Out] = ProcessorFlow(ops)
override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] =
FlowWithSink(output, ops ::: f.ops)
override def prepend[T](f: FlowWithSource[T, In]): Repr[T, Out]#AfterAttachingSource[T, Out] =
RunnableFlow(f.input, output, ops ::: f.ops)
def prepend[T](f: ProcessorFlow[T, In]): FlowWithSink[T, Out] = FlowWithSink(output, ops ::: f.ops)
def prepend[T](f: FlowWithSource[T, In]): RunnableFlow[T, Out] = new RunnableFlow(f.input, output, ops ::: f.ops)
def toSubscriber[I <: In]()(implicit materializer: FlowMaterializer): Subscriber[I] = {
val subIn = SubscriberSource[I]()
def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] = {
val subIn = SubscriberSource[In]()
val mf = withSource(subIn).run()
subIn.subscriber(mf)
}
@ -258,37 +234,36 @@ final case class FlowWithSink[-In, +Out](output: Sink[Out], ops: List[AstNode])
/**
* Flow with attached input, can be used as a `Publisher`.
*/
final case class FlowWithSource[-In, +Out](input: Source[In], ops: List[AstNode]) extends HasNoSink[In, Out] {
override type Repr[-In, +Out] = FlowWithSource[In, Out]
type AfterAttachingSink[-In, +Out] = RunnableFlow[In, Out]
final case class FlowWithSource[-In, +Out](private[scaladsl2] val input: Source[In @uncheckedVariance], ops: List[AstNode]) extends FlowOps[In, Out] {
override type Repr[-I, +O] = FlowWithSource[I, O]
override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops)
override def withSink[O >: Out](out: Sink[O]): AfterAttachingSink[In, O] = RunnableFlow(input, out, ops)
def withSink(out: Sink[Out]): RunnableFlow[In, Out] = new RunnableFlow(input, out, ops)
def withoutSource: ProcessorFlow[In, Out] = ProcessorFlow(ops)
override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = FlowWithSource(input, f.ops ++: ops)
override def append[T](f: FlowWithSink[Out, T]): Repr[In, T]#AfterAttachingSink[In, T] =
RunnableFlow(input, f.output, f.ops ++: ops)
def append[T](f: ProcessorFlow[Out, T]): FlowWithSource[In, T] = FlowWithSource(input, f.ops ++: ops)
def append[T](f: FlowWithSink[Out, T]): RunnableFlow[In, T] = new RunnableFlow(input, f.output, f.ops ++: ops)
def toPublisher[U >: Out]()(implicit materializer: FlowMaterializer): Publisher[U] = {
def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = {
val pubOut = PublisherSink[Out]()
val mf = withSink(pubOut).run()
pubOut.publisher(mf)
}
def publishTo(subscriber: Subscriber[_ >: Out])(implicit materializer: FlowMaterializer): Unit =
toPublisher().subscribe(subscriber.asInstanceOf[Subscriber[Out]])
def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer): Unit =
toPublisher().subscribe(subscriber)
def consume()(implicit materializer: FlowMaterializer): Unit =
withSink(BlackholeSink()).run()
withSink(BlackholeSink).run()
}
/**
* Flow with attached input and output, can be executed.
*/
final case class RunnableFlow[-In, +Out](input: Source[In], output: Sink[Out], ops: List[AstNode]) extends Flow {
final case class RunnableFlow[-In, +Out](private[scaladsl2] val input: Source[In @uncheckedVariance],
private[scaladsl2] val output: Sink[Out @uncheckedVariance], ops: List[AstNode]) extends Flow {
def withoutSink: FlowWithSource[In, Out] = FlowWithSource(input, ops)
def withoutSource: FlowWithSink[In, Out] = FlowWithSink(output, ops)
@ -296,20 +271,20 @@ final case class RunnableFlow[-In, +Out](input: Source[In], output: Sink[Out], o
materializer.materialize(input, output, ops)
}
class MaterializedFlow(sourceKey: AnyRef, matSource: AnyRef, sinkKey: AnyRef, matSink: AnyRef) extends MaterializedSource with MaterializedSink {
override def getSourceFor(key: AnyRef): AnyRef =
if (key == sourceKey) matSource
class MaterializedFlow(sourceKey: AnyRef, matSource: Any, sinkKey: AnyRef, matSink: Any) extends MaterializedSource with MaterializedSink {
override def getSourceFor[T](key: SourceKey[_, T]): T =
if (key == sourceKey) matSource.asInstanceOf[T]
else throw new IllegalArgumentException(s"Source key [$key] doesn't match the source [$sourceKey] of this flow")
def getSinkFor(key: AnyRef): AnyRef =
if (key == sinkKey) matSink
def getSinkFor[T](key: SinkKey[_, T]): T =
if (key == sinkKey) matSink.asInstanceOf[T]
else throw new IllegalArgumentException(s"Sink key [$key] doesn't match the sink [$sinkKey] of this flow")
}
trait MaterializedSource {
def getSourceFor(sourceKey: AnyRef): AnyRef
def getSourceFor[T](sourceKey: SourceKey[_, T]): T
}
trait MaterializedSink {
def getSinkFor(sinkKey: AnyRef): AnyRef
def getSinkFor[T](sinkKey: SinkKey[_, T]): T
}

View file

@ -192,7 +192,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
override def cleanup() = cleanupProbe.ref ! s
}).
withSink(BlackholeSink()).run()
withSink(BlackholeSink).run()
cleanupProbe.expectMsg("a")
}

View file

@ -25,7 +25,7 @@ object StreamTestKit {
*/
def errorPublisher[T](cause: Throwable): Publisher[T] = ErrorPublisher(cause: Throwable).asInstanceOf[Publisher[T]]
def emptyPublisher[T](): Publisher[T] = EmptyPublisher.asInstanceOf[Publisher[T]]
def emptyPublisher[T](): Publisher[T] = EmptyPublisher[T]
/**
* Subscribes the subscriber and signals error after the first request.