diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 5272ca50d8..e877a89598 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -14,8 +14,7 @@ object FlowMaterializer { * Scala API: Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) - * will be used to create these actors, therefore it is *forbidden* to pass this object - * to another actor if the factory is an ActorContext. + * will be used to create one actor that in turn creates actors for the transformation steps. * * The materializer's [[akka.stream.MaterializerSettings]] will be obtained from the * configuration of the `context`'s underlying [[akka.actor.ActorSystem]]. @@ -83,8 +82,7 @@ object FlowMaterializer { * Java API: Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) - * will be used to create these actors, therefore it is *forbidden* to pass this object - * to another actor if the factory is an ActorContext. + * will be used to create one actor that in turn creates actors for the transformation steps. */ def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer = apply(Option(settings), None)(context) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala index 81c24b131f..6eb6390d57 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2013 Typesafe Inc. + * Copyright (C) 2009-2014 Typesafe Inc. */ package akka.stream.impl diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index a6024b9f51..4bc842d927 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -21,6 +21,9 @@ import akka.stream.impl.IterablePublisher import akka.stream.impl.TransformProcessorImpl import akka.stream.impl.ActorProcessor import akka.stream.impl.ExposedPublisher +import akka.stream.scaladsl2.Source +import akka.stream.scaladsl2.Sink +import akka.stream.scaladsl2.MaterializedFlow /** * INTERNAL API @@ -32,21 +35,6 @@ private[akka] object Ast { case class Transform(name: String, mkTransformer: () ⇒ Transformer[Any, Any]) extends AstNode - trait PublisherNode[I] { - private[akka] def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] - } - - final case class ExistingPublisher[I](publisher: Publisher[I]) extends PublisherNode[I] { - def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String) = publisher - } - - final case class IterablePublisherNode[I](iterable: immutable.Iterable[I]) extends PublisherNode[I] { - def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = - if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]] - else ActorPublisher[I](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings), - name = s"$flowName-0-iterable"), Some(iterable)) - } - } /** @@ -78,29 +66,36 @@ private[akka] case class ActorBasedFlowMaterializer( } // Ops come in reverse order - override def toPublisher[I, O](publisherNode: PublisherNode[I], ops: List[AstNode]): Publisher[O] = { + override def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow = { val flowName = createFlowName() - if (ops.isEmpty) publisherNode.createPublisher(this, flowName).asInstanceOf[Publisher[O]] - else { - val opsSize = ops.size - val opProcessor = processorForNode(ops.head, flowName, opsSize) - val topSubscriber = processorChain(opProcessor, ops.tail, flowName, opsSize - 1) - publisherNode.createPublisher(this, flowName).subscribe(topSubscriber.asInstanceOf[Subscriber[I]]) - opProcessor.asInstanceOf[Publisher[O]] - } + val (sourcePublisher, sourceValue) = source.materialize(this, flowName) + val p = + if (ops.isEmpty) sourcePublisher.asInstanceOf[Publisher[Out]] + else { + val opsSize = ops.size + val opProcessor = processorForNode(ops.head, flowName, opsSize) + val topSubscriber = processorChain(opProcessor, ops.tail, flowName, opsSize - 1) + sourcePublisher.subscribe(topSubscriber.asInstanceOf[Subscriber[In]]) + opProcessor.asInstanceOf[Publisher[Out]] + } + val sinkValue = sink.attach(p, this) + new MaterializedFlow(source, sourceValue, sink, sinkValue) } + override def identityProcessor[I](flowName: String): Processor[I, I] = + processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[I, I]] + private val identityTransform = Transform("identity", () ⇒ new Transformer[Any, Any] { override def onNext(element: Any) = List(element) }) - def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { + private def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { val impl = actorOf(ActorProcessorFactory.props(settings, op), s"$flowName-$n-${op.name}") ActorProcessorFactory(impl) } - def actorOf(props: Props, name: String): ActorRef = supervisor match { + private def actorOf(props: Props, name: String): ActorRef = supervisor match { case ref: LocalActorRef ⇒ ref.underlying.attachChild(props, name, systemService = false) case ref: RepointableActorRef ⇒ @@ -115,6 +110,9 @@ private[akka] case class ActorBasedFlowMaterializer( throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]") } + def actorPublisher[I](props: Props, name: String, equalityValue: Option[AnyRef]): Publisher[I] = + ActorPublisher[I](actorOf(props, name), equalityValue) + } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index e6e4a90208..27a6c9b05e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -11,116 +11,190 @@ import org.reactivestreams.Subscriber import akka.stream.Transformer import akka.stream.impl.BlackholeSubscriber import akka.stream.impl2.Ast._ +import scala.annotation.unchecked.uncheckedVariance +import akka.stream.impl.BlackholeSubscriber +import scala.concurrent.Promise +import akka.stream.impl.EmptyPublisher +import akka.stream.impl.IterablePublisher +import akka.stream.impl2.ActorBasedFlowMaterializer sealed trait Flow object FlowFrom { /** - * Helper to create `Flow` without [[Input]]. + * Helper to create `Flow` without [[Source]]. * Example usage: `FlowFrom[Int]` */ def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil) /** - * Helper to create `Flow` with Input from `Iterable`. + * Helper to create `Flow` with [[Source]] from `Iterable`. * Example usage: `FlowFrom(Seq(1,2,3))` */ - def apply[T](i: immutable.Iterable[T]): PublisherFlow[T, T] = FlowFrom[T].withInput(IterableIn(i)) + def apply[T](i: immutable.Iterable[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IterableSource(i)) /** - * Helper to create `Flow` with [[Input]] from `Publisher`. + * Helper to create `Flow` with [[Source]] from `Publisher`. */ - def apply[T](p: Publisher[T]): PublisherFlow[T, T] = FlowFrom[T].withInput(PublisherIn(p)) + def apply[T](p: Publisher[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(PublisherSource(p)) } -trait Input[-In] +trait Source[-In] { + def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) @uncheckedVariance +} /** * Default input. * Allows to materialize a Flow with this input to Subscriber. */ -final case class SubscriberIn[-In]() extends Input[In] { - def subscriber[I <: In]: Subscriber[I] = ??? +final case class SubscriberSource[In]() extends Source[In] { + override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = { + val identityProcessor = materializer.identityProcessor[In](flowName) + (identityProcessor.asInstanceOf[Publisher[In]], identityProcessor.asInstanceOf[Subscriber[In]]) + } + + def subscriber[I <: In](m: MaterializedSource): Subscriber[I] = + m.getSourceFor(this).asInstanceOf[Subscriber[I]] } /** - * Input from Publisher. + * [[Source]] from `Publisher`. */ -final case class PublisherIn[-In](p: Publisher[_ >: In]) extends Input[In] +final case class PublisherSource[In](p: Publisher[_ >: In]) extends Source[In] { + override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = + (p.asInstanceOf[Publisher[In]], p) +} /** - * Input from Iterable + * [[Source]] from `Iterable` * * Changing In from Contravariant to Covariant is needed because Iterable[+A]. - * But this brakes IterableIn variance and we get IterableIn(Seq(1,2,3)): IterableIn[Any] + * But this brakes IterableSource variance and we get IterableSource(Seq(1,2,3)): IterableSource[Any] */ -final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[In] +final case class IterableSource[In](iterable: immutable.Iterable[_ >: In]) extends Source[In] { + override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = { + val p = + if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[In]] + else materializer match { + case m: ActorBasedFlowMaterializer ⇒ + m.actorPublisher(IterablePublisher.props(iterable, materializer.settings), + name = s"$flowName-0-iterable", Some(iterable)) + case other ⇒ + throw new IllegalArgumentException(s"IterableSource requires ActorBasedFlowMaterializer, got [${other.getClass.getName}]") + } + (p.asInstanceOf[Publisher[In]], iterable) + } +} /** - * Input from Future + * [[Source]] from `Future` * * Changing In from Contravariant to Covariant is needed because Future[+A]. - * But this brakes FutureIn variance and we get FutureIn(Future{1}): FutureIn[Any] + * But this brakes FutureSource variance and we get FutureSource(Future{1}): FutureSource[Any] */ -final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In] +final case class FutureSource[In](f: Future[_ >: In]) extends Source[In] { + override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = ??? +} -trait Output[+Out] +trait Sink[+Out] { + def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef +} /** * Default output. * Allows to materialize a Flow with this output to Publisher. */ -final case class PublisherOut[+Out]() extends Output[Out] { - def publisher[O >: Out]: Publisher[O] = ??? +final case class PublisherSink[+Out]() extends Sink[Out] { + def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = flowPublisher + def publisher[O >: Out](m: MaterializedSink): Publisher[O] = m.getSinkFor(this).asInstanceOf[Publisher[O]] } -final case class BlackholeOut[+Out]() extends Output[Out] { - def publisher[O >: Out]: Publisher[O] = ??? +final case class BlackholeSink[+Out]() extends Sink[Out] { + override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = { + val s = new BlackholeSubscriber[Out](materializer.settings.maxInputBufferSize) + flowPublisher.subscribe(s) + s + } } /** - * Output to a Subscriber. + * [[Sink]] to a Subscriber. */ -final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out] +final case class SubscriberSink[+Out](subscriber: Subscriber[_ <: Out]) extends Sink[Out] { + override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = { + flowPublisher.subscribe(subscriber.asInstanceOf[Subscriber[Out]]) + subscriber + } +} + +/** + * INTERNAL API + */ +private[akka] object ForeachSink { + private val ListOfUnit = List(()) +} + +/** + * Foreach output. Invokes the given function for each element. Completes the [[#future]] when + * all elements processed, or stream failed. + */ +final case class ForeachSink[Out](f: Out ⇒ Unit) extends Sink[Out] { // FIXME variance? + override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = { + val promise = Promise[Unit]() + FlowFrom(flowPublisher).transform("foreach", () ⇒ new Transformer[Out, Unit] { + override def onNext(in: Out) = { f(in); Nil } + override def onTermination(e: Option[Throwable]) = { + e match { + case None ⇒ promise.success(()) + case Some(e) ⇒ promise.failure(e) + } + Nil + } + }).consume()(materializer) + promise.future + } + def future(m: MaterializedSink): Future[Unit] = m.getSinkFor(this).asInstanceOf[Future[Unit]] +} /** * Fold output. Reduces output stream according to the given fold function. */ -final case class FoldOut[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Output[Out] { +final case class FoldSink[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Sink[Out] { + override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = ??? def future: Future[T] = ??? } /** - * Operations with a Flow which has open (no attached) Input. + * Operations with a Flow which has no attached [[Source]]. * * No Out type parameter would be useful for Graph signatures, but we need it here - * for `withInput` and `prependTransform` methods. + * for `withSource` and `prependTransform` methods. */ -sealed trait HasOpenInput[-In, +Out] extends Flow { - type Repr[-In, +Out] <: HasOpenInput[In, Out] - type AfterCloseInput[-In, +Out] <: Flow +sealed trait HasNoSource[-In, +Out] extends Flow { + type Repr[-In, +Out] <: HasNoSource[In, Out] + type AfterAttachingSource[-In, +Out] <: Flow - def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] + def withSource[I <: In](in: Source[I]): AfterAttachingSource[I, Out] def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] - def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] + def prepend[T](f: FlowWithSource[T, In]): Repr[T, Out]#AfterAttachingSource[T, Out] } /** - * Operations with a Flow which has open (no attached) Output. + * Operations with a Flow which has no attached [[Sink]]. * * No In type parameter would be useful for Graph signatures, but we need it here - * for `withOutput`. + * for `withSink`. */ -trait HasOpenOutput[-In, +Out] extends Flow { - type Repr[-In, +Out] <: HasOpenOutput[In, Out] - type AfterCloseOutput[-In, +Out] <: Flow +trait HasNoSink[-In, +Out] extends Flow { + type Repr[-In, +Out] <: HasNoSink[In, Out] + type AfterAttachingSink[-In, +Out] <: Flow // Storing ops in reverse order protected def andThen[U](op: AstNode): Repr[In, U] - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] + def withSink[O >: Out](out: Sink[O]): AfterAttachingSink[In, O] def map[T](f: Out ⇒ T): Repr[In, T] = transform("map", () ⇒ new Transformer[Out, T] { @@ -132,87 +206,110 @@ trait HasOpenOutput[-In, +Out] extends Flow { } def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] - def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] + def append[T](f: FlowWithSink[Out, T]): Repr[In, T]#AfterAttachingSink[In, T] + } /** * Flow without attached input and without attached output, can be used as a `Processor`. */ -final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends HasOpenOutput[In, Out] with HasOpenInput[In, Out] { +final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends HasNoSink[In, Out] with HasNoSource[In, Out] { override type Repr[-In, +Out] = ProcessorFlow[In, Out] - type AfterCloseOutput[-In, +Out] = SubscriberFlow[In, Out] - type AfterCloseInput[-In, +Out] = PublisherFlow[In, Out] + type AfterAttachingSink[-In, +Out] = FlowWithSink[In, Out] + type AfterAttachingSource[-In, +Out] = FlowWithSource[In, Out] override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = SubscriberFlow(out, ops) - def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = PublisherFlow(in, ops) + override def withSink[O >: Out](out: Sink[O]): AfterAttachingSink[In, O] = FlowWithSink(out, ops) + override def withSource[I <: In](in: Source[I]): AfterAttachingSource[I, Out] = FlowWithSource(in, ops) override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = ProcessorFlow(ops ::: f.ops) - override def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = - PublisherFlow(f.input, ops ::: f.ops) + override def prepend[T](f: FlowWithSource[T, In]): Repr[T, Out]#AfterAttachingSource[T, Out] = + FlowWithSource(f.input, ops ::: f.ops) override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = ProcessorFlow(f.ops ++: ops) - override def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = - SubscriberFlow(f.output, f.ops ++: ops) + override def append[T](f: FlowWithSink[Out, T]): Repr[In, T]#AfterAttachingSink[In, T] = + FlowWithSink(f.output, f.ops ++: ops) } /** * Flow with attached output, can be used as a `Subscriber`. */ -final case class SubscriberFlow[-In, +Out](output: Output[Out], ops: List[AstNode]) extends HasOpenInput[In, Out] { - type Repr[-In, +Out] = SubscriberFlow[In, Out] - type AfterCloseInput[-In, +Out] = RunnableFlow[In, Out] +final case class FlowWithSink[-In, +Out](output: Sink[Out], ops: List[AstNode]) extends HasNoSource[In, Out] { + type Repr[-In, +Out] = FlowWithSink[In, Out] + type AfterAttachingSource[-In, +Out] = RunnableFlow[In, Out] - def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = RunnableFlow(in, output, ops) - def withoutOutput: ProcessorFlow[In, Out] = ProcessorFlow(ops) + override def withSource[I <: In](in: Source[I]): AfterAttachingSource[I, Out] = RunnableFlow(in, output, ops) + def withoutSink: ProcessorFlow[In, Out] = ProcessorFlow(ops) override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = - SubscriberFlow(output, ops ::: f.ops) - override def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = + FlowWithSink(output, ops ::: f.ops) + override def prepend[T](f: FlowWithSource[T, In]): Repr[T, Out]#AfterAttachingSource[T, Out] = RunnableFlow(f.input, output, ops ::: f.ops) + + def toSubscriber[I <: In]()(implicit materializer: FlowMaterializer): Subscriber[I] = { + val subIn = SubscriberSource[I]() + val mf = withSource(subIn).run() + subIn.subscriber(mf) + } } /** * Flow with attached input, can be used as a `Publisher`. */ -final case class PublisherFlow[-In, +Out](input: Input[In], ops: List[AstNode]) extends HasOpenOutput[In, Out] { - override type Repr[-In, +Out] = PublisherFlow[In, Out] - type AfterCloseOutput[-In, +Out] = RunnableFlow[In, Out] +final case class FlowWithSource[-In, +Out](input: Source[In], ops: List[AstNode]) extends HasNoSink[In, Out] { + override type Repr[-In, +Out] = FlowWithSource[In, Out] + type AfterAttachingSink[-In, +Out] = RunnableFlow[In, Out] override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = RunnableFlow(input, out, ops) - def withoutInput: ProcessorFlow[In, Out] = ProcessorFlow(ops) + override def withSink[O >: Out](out: Sink[O]): AfterAttachingSink[In, O] = RunnableFlow(input, out, ops) + def withoutSource: ProcessorFlow[In, Out] = ProcessorFlow(ops) - override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = PublisherFlow(input, f.ops ++: ops) - override def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = + override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = FlowWithSource(input, f.ops ++: ops) + override def append[T](f: FlowWithSink[Out, T]): Repr[In, T]#AfterAttachingSink[In, T] = RunnableFlow(input, f.output, f.ops ++: ops) + def toPublisher[U >: Out]()(implicit materializer: FlowMaterializer): Publisher[U] = { + val pubOut = PublisherSink[Out]() + val mf = withSink(pubOut).run() + pubOut.publisher(mf) + } + + def publishTo(subscriber: Subscriber[_ >: Out])(implicit materializer: FlowMaterializer): Unit = + toPublisher().subscribe(subscriber.asInstanceOf[Subscriber[Out]]) + + def consume()(implicit materializer: FlowMaterializer): Unit = + withSink(BlackholeSink()).run() + } /** * Flow with attached input and output, can be executed. */ -final case class RunnableFlow[-In, +Out](input: Input[In], output: Output[Out], ops: List[AstNode]) extends Flow { - def withoutOutput: PublisherFlow[In, Out] = PublisherFlow(input, ops) - def withoutInput: SubscriberFlow[In, Out] = SubscriberFlow(output, ops) - - // FIXME - def run()(implicit materializer: FlowMaterializer): Unit = - produceTo(new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize)) - - // FIXME replace with run and input/output factories - def toPublisher[U >: Out]()(implicit materializer: FlowMaterializer): Publisher[U] = - input match { - case PublisherIn(p) ⇒ materializer.toPublisher(ExistingPublisher(p), ops) - case IterableIn(iter) ⇒ materializer.toPublisher(IterablePublisherNode(iter), ops) - case _ ⇒ ??? - } - - def produceTo(subscriber: Subscriber[_ >: Out])(implicit materializer: FlowMaterializer): Unit = - toPublisher().subscribe(subscriber.asInstanceOf[Subscriber[Out]]) +final case class RunnableFlow[-In, +Out](input: Source[In], output: Sink[Out], ops: List[AstNode]) extends Flow { + def withoutSink: FlowWithSource[In, Out] = FlowWithSource(input, ops) + def withoutSource: FlowWithSink[In, Out] = FlowWithSink(output, ops) + def run()(implicit materializer: FlowMaterializer): MaterializedFlow = + materializer.materialize(input, output, ops) } +class MaterializedFlow(sourceKey: AnyRef, matSource: AnyRef, sinkKey: AnyRef, matSink: AnyRef) extends MaterializedSource with MaterializedSink { + override def getSourceFor(key: AnyRef): AnyRef = + if (key == sourceKey) matSource + else throw new IllegalArgumentException(s"Source key [$key] doesn't match the source [$sourceKey] of this flow") + + def getSinkFor(key: AnyRef): AnyRef = + if (key == sinkKey) matSink + else throw new IllegalArgumentException(s"Sink key [$key] doesn't match the sink [$sinkKey] of this flow") +} + +trait MaterializedSource { + def getSourceFor(sourceKey: AnyRef): AnyRef +} + +trait MaterializedSink { + def getSinkFor(sinkKey: AnyRef): AnyRef +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala index 8bb91e81d5..9f97c947b9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala @@ -15,6 +15,7 @@ import akka.actor.ActorContext import akka.stream.impl2.StreamSupervisor import akka.stream.impl2.FlowNameCounter import akka.stream.MaterializerSettings +import org.reactivestreams.Processor object FlowMaterializer { @@ -22,8 +23,7 @@ object FlowMaterializer { * Scala API: Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) - * will be used to create these actors, therefore it is *forbidden* to pass this object - * to another actor if the factory is an ActorContext. + * will be used to create one actor that in turn creates actors for the transformation steps. * * The `namePrefix` is used as the first part of the names of the actors running * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of @@ -49,8 +49,7 @@ object FlowMaterializer { * Java API: Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) - * will be used to create these actors, therefore it is *forbidden* to pass this object - * to another actor if the factory is an ActorContext. + * will be used to create one actor that in turn creates actors for the transformation steps. */ def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer = apply(settings)(context) @@ -75,7 +74,12 @@ abstract class FlowMaterializer(val settings: MaterializerSettings) { * INTERNAL API * ops are stored in reverse order */ - private[akka] def toPublisher[I, O](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Publisher[O] + private[akka] def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow + + /** + * INTERNAL API + */ + private[akka] def identityProcessor[I](flowName: String): Processor[I, I] } diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala index 399f402cc9..a7306a2127 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2013 Typesafe Inc. + * Copyright (C) 2009-2014 Typesafe Inc. */ package akka.stream diff --git a/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala index 5f59df319f..6d5a747d09 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2013 Typesafe Inc. + * Copyright (C) 2009-2014 Typesafe Inc. */ package akka.stream diff --git a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala index a89b199edb..98044f7002 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2013 Typesafe Inc. + * Copyright (C) 2009-2014 Typesafe Inc. */ package akka.stream diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala index ec261a4a6a..41460212e7 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala @@ -11,30 +11,30 @@ import scala.concurrent.Future class FlowSpec extends AkkaSpec { - val intSeq = IterableIn(Seq(1, 2, 3)) - val strSeq = IterableIn(Seq("a", "b", "c")) + val intSeq = IterableSource(Seq(1, 2, 3)) + val strSeq = IterableSource(Seq("a", "b", "c")) import scala.concurrent.ExecutionContext.Implicits.global - val intFut = FutureIn(Future { 3 }) + val intFut = FutureSource(Future { 3 }) implicit val materializer = FlowMaterializer(MaterializerSettings(system)) "ProcessorFlow" should { "go through all states" in { val f: ProcessorFlow[Int, Int] = FlowFrom[Int] - .withInput(intSeq) - .withOutput(PublisherOut()) - .withoutInput - .withoutOutput + .withSource(intSeq) + .withSink(PublisherSink()) + .withoutSource + .withoutSink } "should not run" in { val open: ProcessorFlow[Int, Int] = FlowFrom[Int] "open.run()" shouldNot compile } - "accept IterableIn" in { - val f: PublisherFlow[Int, Int] = FlowFrom[Int].withInput(intSeq) + "accept IterableSource" in { + val f: FlowWithSource[Int, Int] = FlowFrom[Int].withSource(intSeq) } - "accept FutureIn" in { - val f: PublisherFlow[Int, Int] = FlowFrom[Int].withInput(intFut) + "accept FutureSource" in { + val f: FlowWithSource[Int, Int] = FlowFrom[Int].withSource(intFut) } "append ProcessorFlow" in { val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) @@ -42,14 +42,14 @@ class FlowSpec extends AkkaSpec { val open3: ProcessorFlow[Int, Int] = open1.append(open2) "open3.run()" shouldNot compile - val closedInput: PublisherFlow[Int, Int] = open3.withInput(intSeq) - "closedInput.run()" shouldNot compile + val closedSource: FlowWithSource[Int, Int] = open3.withSource(intSeq) + "closedSource.run()" shouldNot compile - val closedOutput: SubscriberFlow[Int, Int] = open3.withOutput(PublisherOut()) - "closedOutput.run()" shouldNot compile + val closedSink: FlowWithSink[Int, Int] = open3.withSink(PublisherSink()) + "closedSink.run()" shouldNot compile - closedInput.withOutput(PublisherOut()).run() - closedOutput.withInput(intSeq).run() + closedSource.withSink(PublisherSink()).run() + closedSink.withSource(intSeq).run() } "prepend ProcessorFlow" in { val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) @@ -57,89 +57,89 @@ class FlowSpec extends AkkaSpec { val open3: ProcessorFlow[String, String] = open1.prepend(open2) "open3.run()" shouldNot compile - val closedInput: PublisherFlow[String, String] = open3.withInput(strSeq) - "closedInput.run()" shouldNot compile + val closedSource: FlowWithSource[String, String] = open3.withSource(strSeq) + "closedSource.run()" shouldNot compile - val closedOutput: SubscriberFlow[String, String] = open3.withOutput(PublisherOut()) - "closedOutput.run()" shouldNot compile + val closedSink: FlowWithSink[String, String] = open3.withSink(PublisherSink()) + "closedSink.run()" shouldNot compile - closedInput.withOutput(PublisherOut()).run - closedOutput.withInput(strSeq).run + closedSource.withSink(PublisherSink()).run + closedSink.withSource(strSeq).run } - "append SubscriberFlow" in { + "append FlowWithSink" in { val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) - val closedOutput: SubscriberFlow[String, Int] = FlowFrom[String].map(_.hashCode).withOutput(PublisherOut()) - val appended: SubscriberFlow[Int, Int] = open.append(closedOutput) + val closedSink: FlowWithSink[String, Int] = FlowFrom[String].map(_.hashCode).withSink(PublisherSink()) + val appended: FlowWithSink[Int, Int] = open.append(closedSink) "appended.run()" shouldNot compile "appended.toFuture" shouldNot compile - appended.withInput(intSeq).run + appended.withSource(intSeq).run } - "prepend PublisherFlow" in { + "prepend FlowWithSource" in { val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) - val closedInput: PublisherFlow[String, Int] = FlowFrom[String].map(_.hashCode).withInput(strSeq) - val prepended: PublisherFlow[String, String] = open.prepend(closedInput) + val closedSource: FlowWithSource[String, Int] = FlowFrom[String].map(_.hashCode).withSource(strSeq) + val prepended: FlowWithSource[String, String] = open.prepend(closedSource) "prepended.run()" shouldNot compile - "prepended.withInput(strSeq)" shouldNot compile - prepended.withOutput(PublisherOut()).run + "prepended.withSource(strSeq)" shouldNot compile + prepended.withSink(PublisherSink()).run } } - "SubscriberFlow" should { - val openInput: SubscriberFlow[Int, String] = - FlowFrom[Int].map(_.toString).withOutput(PublisherOut()) - "accept Input" in { - openInput.withInput(intSeq) + "FlowWithSink" should { + val openSource: FlowWithSink[Int, String] = + FlowFrom[Int].map(_.toString).withSink(PublisherSink()) + "accept Source" in { + openSource.withSource(intSeq) } - "drop Output" in { - openInput.withoutOutput + "drop Sink" in { + openSource.withoutSink } - "not drop Input" in { - "openInput.withoutInput" shouldNot compile + "not drop Source" in { + "openSource.withoutSource" shouldNot compile } - "not accept Output" in { - "openInput.ToFuture" shouldNot compile + "not accept Sink" in { + "openSource.ToFuture" shouldNot compile } "not run()" in { - "openInput.run()" shouldNot compile + "openSource.run()" shouldNot compile } } - "PublisherFlow" should { - val openOutput: PublisherFlow[Int, String] = + "FlowWithSource" should { + val openSink: FlowWithSource[Int, String] = FlowFrom(Seq(1, 2, 3)).map(_.toString) - "accept Output" in { - openOutput.withOutput(PublisherOut()) + "accept Sink" in { + openSink.withSink(PublisherSink()) } - "drop Input" in { - openOutput.withoutInput + "drop Source" in { + openSink.withoutSource } - "not drop Output" in { - "openOutput.withoutOutput" shouldNot compile + "not drop Sink" in { + "openSink.withoutSink" shouldNot compile } - "not accept Input" in { - "openOutput.withInput(intSeq)" shouldNot compile + "not accept Source" in { + "openSink.withSource(intSeq)" shouldNot compile } "not run()" in { - "openOutput.run()" shouldNot compile + "openSink.run()" shouldNot compile } } "RunnableFlow" should { val closed: RunnableFlow[Int, String] = - FlowFrom(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut()) + FlowFrom(Seq(1, 2, 3)).map(_.toString).withSink(PublisherSink()) "run" in { closed.run() } - "drop Input" in { - closed.withoutInput + "drop Source" in { + closed.withoutSource } - "drop Output" in { - closed.withoutOutput + "drop Sink" in { + closed.withoutSink } - "not accept Input" in { - "closed.withInput(intSeq)" shouldNot compile + "not accept Source" in { + "closed.withSource(intSeq)" shouldNot compile } - "not accept Output" in { + "not accept Sink" in { "closed.ToFuture" shouldNot compile } } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala index fb25ddb227..023c76ebe9 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala @@ -23,7 +23,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "A Flow with transform operations" must { "produce one-to-one transformation as expected" in { - val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List(1, 2, 3)).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -32,7 +32,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d List(tot) } }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -46,7 +46,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "produce one-to-several transformation as expected" in { - val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List(1, 2, 3)).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -55,7 +55,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d Vector.fill(elem)(tot) } }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -72,7 +72,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "produce dropping transformation as expected" in { - val p = FlowFrom(List(1, 2, 3, 4)).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List(1, 2, 3, 4)).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -85,7 +85,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } } }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -99,7 +99,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "produce multi-step transformation as expected" in { - val p = FlowFrom(List("a", "bc", "def")).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List("a", "bc", "def")).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[String, Int] { var concat = "" @@ -115,7 +115,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d List(tot) } }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val c1 = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c1) val sub1 = c1.expectSubscription() @@ -138,7 +138,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "invoke onComplete when done" in { - val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List("a")).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[String, String] { var s = "" @@ -148,7 +148,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } override def onTermination(e: Option[Throwable]) = List(s + "B") }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val c = StreamTestKit.SubscriberProbe[String]() p2.subscribe(c) val s = c.expectSubscription() @@ -159,7 +159,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "invoke cleanup when done" in { val cleanupProbe = TestProbe() - val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List("a")).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[String, String] { var s = "" @@ -170,7 +170,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d override def onTermination(e: Option[Throwable]) = List(s + "B") override def cleanup() = cleanupProbe.ref ! s }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val c = StreamTestKit.SubscriberProbe[String]() p2.subscribe(c) val s = c.expectSubscription() @@ -182,7 +182,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "invoke cleanup when done consume" in { val cleanupProbe = TestProbe() - val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List("a")).toPublisher() FlowFrom(p). transform("transform", () ⇒ new Transformer[String, String] { var s = "x" @@ -192,13 +192,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } override def cleanup() = cleanupProbe.ref ! s }). - withOutput(BlackholeOut()).run() + withSink(BlackholeSink()).run() cleanupProbe.expectMsg("a") } "invoke cleanup when done after error" in { val cleanupProbe = TestProbe() - val p = FlowFrom(List("a", "b", "c")).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List("a", "b", "c")).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[String, String] { var s = "" @@ -214,7 +214,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d override def onTermination(e: Option[Throwable]) = List(s + "B") override def cleanup() = cleanupProbe.ref ! s }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val c = StreamTestKit.SubscriberProbe[String]() p2.subscribe(c) val s = c.expectSubscription() @@ -236,7 +236,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } override def isComplete = s == "1" }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val proc = p.expectSubscription val c = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c) @@ -263,7 +263,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d override def onTermination(e: Option[Throwable]) = List(s.length + 10) override def cleanup() = cleanupProbe.ref ! s }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val proc = p.expectSubscription val c = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c) @@ -279,7 +279,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "report error when exception is thrown" in { - val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List(1, 2, 3)).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = { @@ -290,7 +290,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } } }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -304,12 +304,12 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "support cancel as expected" in { - val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List(1, 2, 3)).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = List(elem, elem) }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -323,13 +323,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "support producing elements from empty inputs" in { - val p = FlowFrom(List.empty[Int]).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List.empty[Int]).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = Nil override def onTermination(e: Option[Throwable]) = List(1, 2, 3) }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -363,7 +363,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d case _ ⇒ Nil } } - }).withOutput(PublisherOut()).produceTo(subscriber) + }).publishTo(subscriber) val subscription = subscriber.expectSubscription() subscription.request(10) @@ -383,16 +383,16 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d count += 1 List(count) } - }).withOutput(PublisherOut()) + }) val s1 = StreamTestKit.SubscriberProbe[Int]() - flow.produceTo(s1) + flow.publishTo(s1) s1.expectSubscription().request(3) s1.expectNext(1, 2, 3) s1.expectComplete() val s2 = StreamTestKit.SubscriberProbe[Int]() - flow.produceTo(s2) + flow.publishTo(s2) s2.expectSubscription().request(3) s2.expectNext(1, 2, 3) s2.expectComplete()