=str #15755 #15756 First stab at input/output factories

* New naming based on Source and Sink
This commit is contained in:
Patrik Nordwall 2014-09-01 13:12:18 +02:00
parent 0df3f572e1
commit 0046bebdfe
10 changed files with 306 additions and 209 deletions

View file

@ -14,8 +14,7 @@ object FlowMaterializer {
* Scala API: Creates a FlowMaterializer which will execute every step of a transformation * Scala API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object * will be used to create one actor that in turn creates actors for the transformation steps.
* to another actor if the factory is an ActorContext.
* *
* The materializer's [[akka.stream.MaterializerSettings]] will be obtained from the * The materializer's [[akka.stream.MaterializerSettings]] will be obtained from the
* configuration of the `context`'s underlying [[akka.actor.ActorSystem]]. * configuration of the `context`'s underlying [[akka.actor.ActorSystem]].
@ -83,8 +82,7 @@ object FlowMaterializer {
* Java API: Creates a FlowMaterializer which will execute every step of a transformation * Java API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object * will be used to create one actor that in turn creates actors for the transformation steps.
* to another actor if the factory is an ActorContext.
*/ */
def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer = def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer =
apply(Option(settings), None)(context) apply(Option(settings), None)(context)

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.stream.impl package akka.stream.impl

View file

@ -21,6 +21,9 @@ import akka.stream.impl.IterablePublisher
import akka.stream.impl.TransformProcessorImpl import akka.stream.impl.TransformProcessorImpl
import akka.stream.impl.ActorProcessor import akka.stream.impl.ActorProcessor
import akka.stream.impl.ExposedPublisher import akka.stream.impl.ExposedPublisher
import akka.stream.scaladsl2.Source
import akka.stream.scaladsl2.Sink
import akka.stream.scaladsl2.MaterializedFlow
/** /**
* INTERNAL API * INTERNAL API
@ -32,21 +35,6 @@ private[akka] object Ast {
case class Transform(name: String, mkTransformer: () Transformer[Any, Any]) extends AstNode case class Transform(name: String, mkTransformer: () Transformer[Any, Any]) extends AstNode
trait PublisherNode[I] {
private[akka] def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I]
}
final case class ExistingPublisher[I](publisher: Publisher[I]) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String) = publisher
}
final case class IterablePublisherNode[I](iterable: immutable.Iterable[I]) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]]
else ActorPublisher[I](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings),
name = s"$flowName-0-iterable"), Some(iterable))
}
} }
/** /**
@ -78,29 +66,36 @@ private[akka] case class ActorBasedFlowMaterializer(
} }
// Ops come in reverse order // Ops come in reverse order
override def toPublisher[I, O](publisherNode: PublisherNode[I], ops: List[AstNode]): Publisher[O] = { override def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow = {
val flowName = createFlowName() val flowName = createFlowName()
if (ops.isEmpty) publisherNode.createPublisher(this, flowName).asInstanceOf[Publisher[O]] val (sourcePublisher, sourceValue) = source.materialize(this, flowName)
else { val p =
val opsSize = ops.size if (ops.isEmpty) sourcePublisher.asInstanceOf[Publisher[Out]]
val opProcessor = processorForNode(ops.head, flowName, opsSize) else {
val topSubscriber = processorChain(opProcessor, ops.tail, flowName, opsSize - 1) val opsSize = ops.size
publisherNode.createPublisher(this, flowName).subscribe(topSubscriber.asInstanceOf[Subscriber[I]]) val opProcessor = processorForNode(ops.head, flowName, opsSize)
opProcessor.asInstanceOf[Publisher[O]] val topSubscriber = processorChain(opProcessor, ops.tail, flowName, opsSize - 1)
} sourcePublisher.subscribe(topSubscriber.asInstanceOf[Subscriber[In]])
opProcessor.asInstanceOf[Publisher[Out]]
}
val sinkValue = sink.attach(p, this)
new MaterializedFlow(source, sourceValue, sink, sinkValue)
} }
override def identityProcessor[I](flowName: String): Processor[I, I] =
processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[I, I]]
private val identityTransform = Transform("identity", () private val identityTransform = Transform("identity", ()
new Transformer[Any, Any] { new Transformer[Any, Any] {
override def onNext(element: Any) = List(element) override def onNext(element: Any) = List(element)
}) })
def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { private def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
val impl = actorOf(ActorProcessorFactory.props(settings, op), s"$flowName-$n-${op.name}") val impl = actorOf(ActorProcessorFactory.props(settings, op), s"$flowName-$n-${op.name}")
ActorProcessorFactory(impl) ActorProcessorFactory(impl)
} }
def actorOf(props: Props, name: String): ActorRef = supervisor match { private def actorOf(props: Props, name: String): ActorRef = supervisor match {
case ref: LocalActorRef case ref: LocalActorRef
ref.underlying.attachChild(props, name, systemService = false) ref.underlying.attachChild(props, name, systemService = false)
case ref: RepointableActorRef case ref: RepointableActorRef
@ -115,6 +110,9 @@ private[akka] case class ActorBasedFlowMaterializer(
throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]") throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]")
} }
def actorPublisher[I](props: Props, name: String, equalityValue: Option[AnyRef]): Publisher[I] =
ActorPublisher[I](actorOf(props, name), equalityValue)
} }
/** /**

View file

@ -11,116 +11,190 @@ import org.reactivestreams.Subscriber
import akka.stream.Transformer import akka.stream.Transformer
import akka.stream.impl.BlackholeSubscriber import akka.stream.impl.BlackholeSubscriber
import akka.stream.impl2.Ast._ import akka.stream.impl2.Ast._
import scala.annotation.unchecked.uncheckedVariance
import akka.stream.impl.BlackholeSubscriber
import scala.concurrent.Promise
import akka.stream.impl.EmptyPublisher
import akka.stream.impl.IterablePublisher
import akka.stream.impl2.ActorBasedFlowMaterializer
sealed trait Flow sealed trait Flow
object FlowFrom { object FlowFrom {
/** /**
* Helper to create `Flow` without [[Input]]. * Helper to create `Flow` without [[Source]].
* Example usage: `FlowFrom[Int]` * Example usage: `FlowFrom[Int]`
*/ */
def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil) def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil)
/** /**
* Helper to create `Flow` with Input from `Iterable`. * Helper to create `Flow` with [[Source]] from `Iterable`.
* Example usage: `FlowFrom(Seq(1,2,3))` * Example usage: `FlowFrom(Seq(1,2,3))`
*/ */
def apply[T](i: immutable.Iterable[T]): PublisherFlow[T, T] = FlowFrom[T].withInput(IterableIn(i)) def apply[T](i: immutable.Iterable[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IterableSource(i))
/** /**
* Helper to create `Flow` with [[Input]] from `Publisher`. * Helper to create `Flow` with [[Source]] from `Publisher`.
*/ */
def apply[T](p: Publisher[T]): PublisherFlow[T, T] = FlowFrom[T].withInput(PublisherIn(p)) def apply[T](p: Publisher[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(PublisherSource(p))
} }
trait Input[-In] trait Source[-In] {
def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) @uncheckedVariance
}
/** /**
* Default input. * Default input.
* Allows to materialize a Flow with this input to Subscriber. * Allows to materialize a Flow with this input to Subscriber.
*/ */
final case class SubscriberIn[-In]() extends Input[In] { final case class SubscriberSource[In]() extends Source[In] {
def subscriber[I <: In]: Subscriber[I] = ??? override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = {
val identityProcessor = materializer.identityProcessor[In](flowName)
(identityProcessor.asInstanceOf[Publisher[In]], identityProcessor.asInstanceOf[Subscriber[In]])
}
def subscriber[I <: In](m: MaterializedSource): Subscriber[I] =
m.getSourceFor(this).asInstanceOf[Subscriber[I]]
} }
/** /**
* Input from Publisher. * [[Source]] from `Publisher`.
*/ */
final case class PublisherIn[-In](p: Publisher[_ >: In]) extends Input[In] final case class PublisherSource[In](p: Publisher[_ >: In]) extends Source[In] {
override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) =
(p.asInstanceOf[Publisher[In]], p)
}
/** /**
* Input from Iterable * [[Source]] from `Iterable`
* *
* Changing In from Contravariant to Covariant is needed because Iterable[+A]. * Changing In from Contravariant to Covariant is needed because Iterable[+A].
* But this brakes IterableIn variance and we get IterableIn(Seq(1,2,3)): IterableIn[Any] * But this brakes IterableSource variance and we get IterableSource(Seq(1,2,3)): IterableSource[Any]
*/ */
final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[In] final case class IterableSource[In](iterable: immutable.Iterable[_ >: In]) extends Source[In] {
override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = {
val p =
if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[In]]
else materializer match {
case m: ActorBasedFlowMaterializer
m.actorPublisher(IterablePublisher.props(iterable, materializer.settings),
name = s"$flowName-0-iterable", Some(iterable))
case other
throw new IllegalArgumentException(s"IterableSource requires ActorBasedFlowMaterializer, got [${other.getClass.getName}]")
}
(p.asInstanceOf[Publisher[In]], iterable)
}
}
/** /**
* Input from Future * [[Source]] from `Future`
* *
* Changing In from Contravariant to Covariant is needed because Future[+A]. * Changing In from Contravariant to Covariant is needed because Future[+A].
* But this brakes FutureIn variance and we get FutureIn(Future{1}): FutureIn[Any] * But this brakes FutureSource variance and we get FutureSource(Future{1}): FutureSource[Any]
*/ */
final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In] final case class FutureSource[In](f: Future[_ >: In]) extends Source[In] {
override def materialize(materializer: FlowMaterializer, flowName: String): (Publisher[In], AnyRef) = ???
}
trait Output[+Out] trait Sink[+Out] {
def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef
}
/** /**
* Default output. * Default output.
* Allows to materialize a Flow with this output to Publisher. * Allows to materialize a Flow with this output to Publisher.
*/ */
final case class PublisherOut[+Out]() extends Output[Out] { final case class PublisherSink[+Out]() extends Sink[Out] {
def publisher[O >: Out]: Publisher[O] = ??? def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = flowPublisher
def publisher[O >: Out](m: MaterializedSink): Publisher[O] = m.getSinkFor(this).asInstanceOf[Publisher[O]]
} }
final case class BlackholeOut[+Out]() extends Output[Out] { final case class BlackholeSink[+Out]() extends Sink[Out] {
def publisher[O >: Out]: Publisher[O] = ??? override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = {
val s = new BlackholeSubscriber[Out](materializer.settings.maxInputBufferSize)
flowPublisher.subscribe(s)
s
}
} }
/** /**
* Output to a Subscriber. * [[Sink]] to a Subscriber.
*/ */
final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out] final case class SubscriberSink[+Out](subscriber: Subscriber[_ <: Out]) extends Sink[Out] {
override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = {
flowPublisher.subscribe(subscriber.asInstanceOf[Subscriber[Out]])
subscriber
}
}
/**
* INTERNAL API
*/
private[akka] object ForeachSink {
private val ListOfUnit = List(())
}
/**
* Foreach output. Invokes the given function for each element. Completes the [[#future]] when
* all elements processed, or stream failed.
*/
final case class ForeachSink[Out](f: Out Unit) extends Sink[Out] { // FIXME variance?
override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = {
val promise = Promise[Unit]()
FlowFrom(flowPublisher).transform("foreach", () new Transformer[Out, Unit] {
override def onNext(in: Out) = { f(in); Nil }
override def onTermination(e: Option[Throwable]) = {
e match {
case None promise.success(())
case Some(e) promise.failure(e)
}
Nil
}
}).consume()(materializer)
promise.future
}
def future(m: MaterializedSink): Future[Unit] = m.getSinkFor(this).asInstanceOf[Future[Unit]]
}
/** /**
* Fold output. Reduces output stream according to the given fold function. * Fold output. Reduces output stream according to the given fold function.
*/ */
final case class FoldOut[T, +Out](zero: T)(f: (T, Out) T) extends Output[Out] { final case class FoldSink[T, +Out](zero: T)(f: (T, Out) T) extends Sink[Out] {
override def attach(flowPublisher: Publisher[Out] @uncheckedVariance, materializer: FlowMaterializer): AnyRef = ???
def future: Future[T] = ??? def future: Future[T] = ???
} }
/** /**
* Operations with a Flow which has open (no attached) Input. * Operations with a Flow which has no attached [[Source]].
* *
* No Out type parameter would be useful for Graph signatures, but we need it here * No Out type parameter would be useful for Graph signatures, but we need it here
* for `withInput` and `prependTransform` methods. * for `withSource` and `prependTransform` methods.
*/ */
sealed trait HasOpenInput[-In, +Out] extends Flow { sealed trait HasNoSource[-In, +Out] extends Flow {
type Repr[-In, +Out] <: HasOpenInput[In, Out] type Repr[-In, +Out] <: HasNoSource[In, Out]
type AfterCloseInput[-In, +Out] <: Flow type AfterAttachingSource[-In, +Out] <: Flow
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] def withSource[I <: In](in: Source[I]): AfterAttachingSource[I, Out]
def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out]
def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] def prepend[T](f: FlowWithSource[T, In]): Repr[T, Out]#AfterAttachingSource[T, Out]
} }
/** /**
* Operations with a Flow which has open (no attached) Output. * Operations with a Flow which has no attached [[Sink]].
* *
* No In type parameter would be useful for Graph signatures, but we need it here * No In type parameter would be useful for Graph signatures, but we need it here
* for `withOutput`. * for `withSink`.
*/ */
trait HasOpenOutput[-In, +Out] extends Flow { trait HasNoSink[-In, +Out] extends Flow {
type Repr[-In, +Out] <: HasOpenOutput[In, Out] type Repr[-In, +Out] <: HasNoSink[In, Out]
type AfterCloseOutput[-In, +Out] <: Flow type AfterAttachingSink[-In, +Out] <: Flow
// Storing ops in reverse order // Storing ops in reverse order
protected def andThen[U](op: AstNode): Repr[In, U] protected def andThen[U](op: AstNode): Repr[In, U]
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] def withSink[O >: Out](out: Sink[O]): AfterAttachingSink[In, O]
def map[T](f: Out T): Repr[In, T] = def map[T](f: Out T): Repr[In, T] =
transform("map", () new Transformer[Out, T] { transform("map", () new Transformer[Out, T] {
@ -132,87 +206,110 @@ trait HasOpenOutput[-In, +Out] extends Flow {
} }
def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] def append[T](f: ProcessorFlow[Out, T]): Repr[In, T]
def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] def append[T](f: FlowWithSink[Out, T]): Repr[In, T]#AfterAttachingSink[In, T]
} }
/** /**
* Flow without attached input and without attached output, can be used as a `Processor`. * Flow without attached input and without attached output, can be used as a `Processor`.
*/ */
final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends HasOpenOutput[In, Out] with HasOpenInput[In, Out] { final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends HasNoSink[In, Out] with HasNoSource[In, Out] {
override type Repr[-In, +Out] = ProcessorFlow[In, Out] override type Repr[-In, +Out] = ProcessorFlow[In, Out]
type AfterCloseOutput[-In, +Out] = SubscriberFlow[In, Out] type AfterAttachingSink[-In, +Out] = FlowWithSink[In, Out]
type AfterCloseInput[-In, +Out] = PublisherFlow[In, Out] type AfterAttachingSource[-In, +Out] = FlowWithSource[In, Out]
override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops)
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = SubscriberFlow(out, ops) override def withSink[O >: Out](out: Sink[O]): AfterAttachingSink[In, O] = FlowWithSink(out, ops)
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = PublisherFlow(in, ops) override def withSource[I <: In](in: Source[I]): AfterAttachingSource[I, Out] = FlowWithSource(in, ops)
override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] =
ProcessorFlow(ops ::: f.ops) ProcessorFlow(ops ::: f.ops)
override def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = override def prepend[T](f: FlowWithSource[T, In]): Repr[T, Out]#AfterAttachingSource[T, Out] =
PublisherFlow(f.input, ops ::: f.ops) FlowWithSource(f.input, ops ::: f.ops)
override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = ProcessorFlow(f.ops ++: ops) override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = ProcessorFlow(f.ops ++: ops)
override def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = override def append[T](f: FlowWithSink[Out, T]): Repr[In, T]#AfterAttachingSink[In, T] =
SubscriberFlow(f.output, f.ops ++: ops) FlowWithSink(f.output, f.ops ++: ops)
} }
/** /**
* Flow with attached output, can be used as a `Subscriber`. * Flow with attached output, can be used as a `Subscriber`.
*/ */
final case class SubscriberFlow[-In, +Out](output: Output[Out], ops: List[AstNode]) extends HasOpenInput[In, Out] { final case class FlowWithSink[-In, +Out](output: Sink[Out], ops: List[AstNode]) extends HasNoSource[In, Out] {
type Repr[-In, +Out] = SubscriberFlow[In, Out] type Repr[-In, +Out] = FlowWithSink[In, Out]
type AfterCloseInput[-In, +Out] = RunnableFlow[In, Out] type AfterAttachingSource[-In, +Out] = RunnableFlow[In, Out]
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = RunnableFlow(in, output, ops) override def withSource[I <: In](in: Source[I]): AfterAttachingSource[I, Out] = RunnableFlow(in, output, ops)
def withoutOutput: ProcessorFlow[In, Out] = ProcessorFlow(ops) def withoutSink: ProcessorFlow[In, Out] = ProcessorFlow(ops)
override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] =
SubscriberFlow(output, ops ::: f.ops) FlowWithSink(output, ops ::: f.ops)
override def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = override def prepend[T](f: FlowWithSource[T, In]): Repr[T, Out]#AfterAttachingSource[T, Out] =
RunnableFlow(f.input, output, ops ::: f.ops) RunnableFlow(f.input, output, ops ::: f.ops)
def toSubscriber[I <: In]()(implicit materializer: FlowMaterializer): Subscriber[I] = {
val subIn = SubscriberSource[I]()
val mf = withSource(subIn).run()
subIn.subscriber(mf)
}
} }
/** /**
* Flow with attached input, can be used as a `Publisher`. * Flow with attached input, can be used as a `Publisher`.
*/ */
final case class PublisherFlow[-In, +Out](input: Input[In], ops: List[AstNode]) extends HasOpenOutput[In, Out] { final case class FlowWithSource[-In, +Out](input: Source[In], ops: List[AstNode]) extends HasNoSink[In, Out] {
override type Repr[-In, +Out] = PublisherFlow[In, Out] override type Repr[-In, +Out] = FlowWithSource[In, Out]
type AfterCloseOutput[-In, +Out] = RunnableFlow[In, Out] type AfterAttachingSink[-In, +Out] = RunnableFlow[In, Out]
override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops)
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = RunnableFlow(input, out, ops) override def withSink[O >: Out](out: Sink[O]): AfterAttachingSink[In, O] = RunnableFlow(input, out, ops)
def withoutInput: ProcessorFlow[In, Out] = ProcessorFlow(ops) def withoutSource: ProcessorFlow[In, Out] = ProcessorFlow(ops)
override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = PublisherFlow(input, f.ops ++: ops) override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = FlowWithSource(input, f.ops ++: ops)
override def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = override def append[T](f: FlowWithSink[Out, T]): Repr[In, T]#AfterAttachingSink[In, T] =
RunnableFlow(input, f.output, f.ops ++: ops) RunnableFlow(input, f.output, f.ops ++: ops)
def toPublisher[U >: Out]()(implicit materializer: FlowMaterializer): Publisher[U] = {
val pubOut = PublisherSink[Out]()
val mf = withSink(pubOut).run()
pubOut.publisher(mf)
}
def publishTo(subscriber: Subscriber[_ >: Out])(implicit materializer: FlowMaterializer): Unit =
toPublisher().subscribe(subscriber.asInstanceOf[Subscriber[Out]])
def consume()(implicit materializer: FlowMaterializer): Unit =
withSink(BlackholeSink()).run()
} }
/** /**
* Flow with attached input and output, can be executed. * Flow with attached input and output, can be executed.
*/ */
final case class RunnableFlow[-In, +Out](input: Input[In], output: Output[Out], ops: List[AstNode]) extends Flow { final case class RunnableFlow[-In, +Out](input: Source[In], output: Sink[Out], ops: List[AstNode]) extends Flow {
def withoutOutput: PublisherFlow[In, Out] = PublisherFlow(input, ops) def withoutSink: FlowWithSource[In, Out] = FlowWithSource(input, ops)
def withoutInput: SubscriberFlow[In, Out] = SubscriberFlow(output, ops) def withoutSource: FlowWithSink[In, Out] = FlowWithSink(output, ops)
// FIXME
def run()(implicit materializer: FlowMaterializer): Unit =
produceTo(new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize))
// FIXME replace with run and input/output factories
def toPublisher[U >: Out]()(implicit materializer: FlowMaterializer): Publisher[U] =
input match {
case PublisherIn(p) materializer.toPublisher(ExistingPublisher(p), ops)
case IterableIn(iter) materializer.toPublisher(IterablePublisherNode(iter), ops)
case _ ???
}
def produceTo(subscriber: Subscriber[_ >: Out])(implicit materializer: FlowMaterializer): Unit =
toPublisher().subscribe(subscriber.asInstanceOf[Subscriber[Out]])
def run()(implicit materializer: FlowMaterializer): MaterializedFlow =
materializer.materialize(input, output, ops)
} }
class MaterializedFlow(sourceKey: AnyRef, matSource: AnyRef, sinkKey: AnyRef, matSink: AnyRef) extends MaterializedSource with MaterializedSink {
override def getSourceFor(key: AnyRef): AnyRef =
if (key == sourceKey) matSource
else throw new IllegalArgumentException(s"Source key [$key] doesn't match the source [$sourceKey] of this flow")
def getSinkFor(key: AnyRef): AnyRef =
if (key == sinkKey) matSink
else throw new IllegalArgumentException(s"Sink key [$key] doesn't match the sink [$sinkKey] of this flow")
}
trait MaterializedSource {
def getSourceFor(sourceKey: AnyRef): AnyRef
}
trait MaterializedSink {
def getSinkFor(sinkKey: AnyRef): AnyRef
}

View file

@ -15,6 +15,7 @@ import akka.actor.ActorContext
import akka.stream.impl2.StreamSupervisor import akka.stream.impl2.StreamSupervisor
import akka.stream.impl2.FlowNameCounter import akka.stream.impl2.FlowNameCounter
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
import org.reactivestreams.Processor
object FlowMaterializer { object FlowMaterializer {
@ -22,8 +23,7 @@ object FlowMaterializer {
* Scala API: Creates a FlowMaterializer which will execute every step of a transformation * Scala API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object * will be used to create one actor that in turn creates actors for the transformation steps.
* to another actor if the factory is an ActorContext.
* *
* The `namePrefix` is used as the first part of the names of the actors running * The `namePrefix` is used as the first part of the names of the actors running
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
@ -49,8 +49,7 @@ object FlowMaterializer {
* Java API: Creates a FlowMaterializer which will execute every step of a transformation * Java API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object * will be used to create one actor that in turn creates actors for the transformation steps.
* to another actor if the factory is an ActorContext.
*/ */
def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer = def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer =
apply(settings)(context) apply(settings)(context)
@ -75,7 +74,12 @@ abstract class FlowMaterializer(val settings: MaterializerSettings) {
* INTERNAL API * INTERNAL API
* ops are stored in reverse order * ops are stored in reverse order
*/ */
private[akka] def toPublisher[I, O](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Publisher[O] private[akka] def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow
/**
* INTERNAL API
*/
private[akka] def identityProcessor[I](flowName: String): Processor[I, I]
} }

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.stream package akka.stream

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.stream package akka.stream

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.stream package akka.stream

View file

@ -11,30 +11,30 @@ import scala.concurrent.Future
class FlowSpec extends AkkaSpec { class FlowSpec extends AkkaSpec {
val intSeq = IterableIn(Seq(1, 2, 3)) val intSeq = IterableSource(Seq(1, 2, 3))
val strSeq = IterableIn(Seq("a", "b", "c")) val strSeq = IterableSource(Seq("a", "b", "c"))
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.ExecutionContext.Implicits.global
val intFut = FutureIn(Future { 3 }) val intFut = FutureSource(Future { 3 })
implicit val materializer = FlowMaterializer(MaterializerSettings(system)) implicit val materializer = FlowMaterializer(MaterializerSettings(system))
"ProcessorFlow" should { "ProcessorFlow" should {
"go through all states" in { "go through all states" in {
val f: ProcessorFlow[Int, Int] = FlowFrom[Int] val f: ProcessorFlow[Int, Int] = FlowFrom[Int]
.withInput(intSeq) .withSource(intSeq)
.withOutput(PublisherOut()) .withSink(PublisherSink())
.withoutInput .withoutSource
.withoutOutput .withoutSink
} }
"should not run" in { "should not run" in {
val open: ProcessorFlow[Int, Int] = FlowFrom[Int] val open: ProcessorFlow[Int, Int] = FlowFrom[Int]
"open.run()" shouldNot compile "open.run()" shouldNot compile
} }
"accept IterableIn" in { "accept IterableSource" in {
val f: PublisherFlow[Int, Int] = FlowFrom[Int].withInput(intSeq) val f: FlowWithSource[Int, Int] = FlowFrom[Int].withSource(intSeq)
} }
"accept FutureIn" in { "accept FutureSource" in {
val f: PublisherFlow[Int, Int] = FlowFrom[Int].withInput(intFut) val f: FlowWithSource[Int, Int] = FlowFrom[Int].withSource(intFut)
} }
"append ProcessorFlow" in { "append ProcessorFlow" in {
val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString)
@ -42,14 +42,14 @@ class FlowSpec extends AkkaSpec {
val open3: ProcessorFlow[Int, Int] = open1.append(open2) val open3: ProcessorFlow[Int, Int] = open1.append(open2)
"open3.run()" shouldNot compile "open3.run()" shouldNot compile
val closedInput: PublisherFlow[Int, Int] = open3.withInput(intSeq) val closedSource: FlowWithSource[Int, Int] = open3.withSource(intSeq)
"closedInput.run()" shouldNot compile "closedSource.run()" shouldNot compile
val closedOutput: SubscriberFlow[Int, Int] = open3.withOutput(PublisherOut()) val closedSink: FlowWithSink[Int, Int] = open3.withSink(PublisherSink())
"closedOutput.run()" shouldNot compile "closedSink.run()" shouldNot compile
closedInput.withOutput(PublisherOut()).run() closedSource.withSink(PublisherSink()).run()
closedOutput.withInput(intSeq).run() closedSink.withSource(intSeq).run()
} }
"prepend ProcessorFlow" in { "prepend ProcessorFlow" in {
val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString)
@ -57,89 +57,89 @@ class FlowSpec extends AkkaSpec {
val open3: ProcessorFlow[String, String] = open1.prepend(open2) val open3: ProcessorFlow[String, String] = open1.prepend(open2)
"open3.run()" shouldNot compile "open3.run()" shouldNot compile
val closedInput: PublisherFlow[String, String] = open3.withInput(strSeq) val closedSource: FlowWithSource[String, String] = open3.withSource(strSeq)
"closedInput.run()" shouldNot compile "closedSource.run()" shouldNot compile
val closedOutput: SubscriberFlow[String, String] = open3.withOutput(PublisherOut()) val closedSink: FlowWithSink[String, String] = open3.withSink(PublisherSink())
"closedOutput.run()" shouldNot compile "closedSink.run()" shouldNot compile
closedInput.withOutput(PublisherOut()).run closedSource.withSink(PublisherSink()).run
closedOutput.withInput(strSeq).run closedSink.withSource(strSeq).run
} }
"append SubscriberFlow" in { "append FlowWithSink" in {
val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString)
val closedOutput: SubscriberFlow[String, Int] = FlowFrom[String].map(_.hashCode).withOutput(PublisherOut()) val closedSink: FlowWithSink[String, Int] = FlowFrom[String].map(_.hashCode).withSink(PublisherSink())
val appended: SubscriberFlow[Int, Int] = open.append(closedOutput) val appended: FlowWithSink[Int, Int] = open.append(closedSink)
"appended.run()" shouldNot compile "appended.run()" shouldNot compile
"appended.toFuture" shouldNot compile "appended.toFuture" shouldNot compile
appended.withInput(intSeq).run appended.withSource(intSeq).run
} }
"prepend PublisherFlow" in { "prepend FlowWithSource" in {
val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString)
val closedInput: PublisherFlow[String, Int] = FlowFrom[String].map(_.hashCode).withInput(strSeq) val closedSource: FlowWithSource[String, Int] = FlowFrom[String].map(_.hashCode).withSource(strSeq)
val prepended: PublisherFlow[String, String] = open.prepend(closedInput) val prepended: FlowWithSource[String, String] = open.prepend(closedSource)
"prepended.run()" shouldNot compile "prepended.run()" shouldNot compile
"prepended.withInput(strSeq)" shouldNot compile "prepended.withSource(strSeq)" shouldNot compile
prepended.withOutput(PublisherOut()).run prepended.withSink(PublisherSink()).run
} }
} }
"SubscriberFlow" should { "FlowWithSink" should {
val openInput: SubscriberFlow[Int, String] = val openSource: FlowWithSink[Int, String] =
FlowFrom[Int].map(_.toString).withOutput(PublisherOut()) FlowFrom[Int].map(_.toString).withSink(PublisherSink())
"accept Input" in { "accept Source" in {
openInput.withInput(intSeq) openSource.withSource(intSeq)
} }
"drop Output" in { "drop Sink" in {
openInput.withoutOutput openSource.withoutSink
} }
"not drop Input" in { "not drop Source" in {
"openInput.withoutInput" shouldNot compile "openSource.withoutSource" shouldNot compile
} }
"not accept Output" in { "not accept Sink" in {
"openInput.ToFuture" shouldNot compile "openSource.ToFuture" shouldNot compile
} }
"not run()" in { "not run()" in {
"openInput.run()" shouldNot compile "openSource.run()" shouldNot compile
} }
} }
"PublisherFlow" should { "FlowWithSource" should {
val openOutput: PublisherFlow[Int, String] = val openSink: FlowWithSource[Int, String] =
FlowFrom(Seq(1, 2, 3)).map(_.toString) FlowFrom(Seq(1, 2, 3)).map(_.toString)
"accept Output" in { "accept Sink" in {
openOutput.withOutput(PublisherOut()) openSink.withSink(PublisherSink())
} }
"drop Input" in { "drop Source" in {
openOutput.withoutInput openSink.withoutSource
} }
"not drop Output" in { "not drop Sink" in {
"openOutput.withoutOutput" shouldNot compile "openSink.withoutSink" shouldNot compile
} }
"not accept Input" in { "not accept Source" in {
"openOutput.withInput(intSeq)" shouldNot compile "openSink.withSource(intSeq)" shouldNot compile
} }
"not run()" in { "not run()" in {
"openOutput.run()" shouldNot compile "openSink.run()" shouldNot compile
} }
} }
"RunnableFlow" should { "RunnableFlow" should {
val closed: RunnableFlow[Int, String] = val closed: RunnableFlow[Int, String] =
FlowFrom(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut()) FlowFrom(Seq(1, 2, 3)).map(_.toString).withSink(PublisherSink())
"run" in { "run" in {
closed.run() closed.run()
} }
"drop Input" in { "drop Source" in {
closed.withoutInput closed.withoutSource
} }
"drop Output" in { "drop Sink" in {
closed.withoutOutput closed.withoutSink
} }
"not accept Input" in { "not accept Source" in {
"closed.withInput(intSeq)" shouldNot compile "closed.withSource(intSeq)" shouldNot compile
} }
"not accept Output" in { "not accept Sink" in {
"closed.ToFuture" shouldNot compile "closed.ToFuture" shouldNot compile
} }
} }

View file

@ -23,7 +23,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"A Flow with transform operations" must { "A Flow with transform operations" must {
"produce one-to-one transformation as expected" in { "produce one-to-one transformation as expected" in {
val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() val p = FlowFrom(List(1, 2, 3)).toPublisher()
val p2 = FlowFrom(p). val p2 = FlowFrom(p).
transform("transform", () new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
var tot = 0 var tot = 0
@ -32,7 +32,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
List(tot) List(tot)
} }
}). }).
withOutput(PublisherOut()).toPublisher() toPublisher()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber) p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
@ -46,7 +46,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
} }
"produce one-to-several transformation as expected" in { "produce one-to-several transformation as expected" in {
val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() val p = FlowFrom(List(1, 2, 3)).toPublisher()
val p2 = FlowFrom(p). val p2 = FlowFrom(p).
transform("transform", () new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
var tot = 0 var tot = 0
@ -55,7 +55,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
Vector.fill(elem)(tot) Vector.fill(elem)(tot)
} }
}). }).
withOutput(PublisherOut()).toPublisher() toPublisher()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber) p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
@ -72,7 +72,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
} }
"produce dropping transformation as expected" in { "produce dropping transformation as expected" in {
val p = FlowFrom(List(1, 2, 3, 4)).withOutput(PublisherOut()).toPublisher() val p = FlowFrom(List(1, 2, 3, 4)).toPublisher()
val p2 = FlowFrom(p). val p2 = FlowFrom(p).
transform("transform", () new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
var tot = 0 var tot = 0
@ -85,7 +85,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
} }
} }
}). }).
withOutput(PublisherOut()).toPublisher() toPublisher()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber) p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
@ -99,7 +99,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
} }
"produce multi-step transformation as expected" in { "produce multi-step transformation as expected" in {
val p = FlowFrom(List("a", "bc", "def")).withOutput(PublisherOut()).toPublisher() val p = FlowFrom(List("a", "bc", "def")).toPublisher()
val p2 = FlowFrom(p). val p2 = FlowFrom(p).
transform("transform", () new Transformer[String, Int] { transform("transform", () new Transformer[String, Int] {
var concat = "" var concat = ""
@ -115,7 +115,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
List(tot) List(tot)
} }
}). }).
withOutput(PublisherOut()).toPublisher() toPublisher()
val c1 = StreamTestKit.SubscriberProbe[Int]() val c1 = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(c1) p2.subscribe(c1)
val sub1 = c1.expectSubscription() val sub1 = c1.expectSubscription()
@ -138,7 +138,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
} }
"invoke onComplete when done" in { "invoke onComplete when done" in {
val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() val p = FlowFrom(List("a")).toPublisher()
val p2 = FlowFrom(p). val p2 = FlowFrom(p).
transform("transform", () new Transformer[String, String] { transform("transform", () new Transformer[String, String] {
var s = "" var s = ""
@ -148,7 +148,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
} }
override def onTermination(e: Option[Throwable]) = List(s + "B") override def onTermination(e: Option[Throwable]) = List(s + "B")
}). }).
withOutput(PublisherOut()).toPublisher() toPublisher()
val c = StreamTestKit.SubscriberProbe[String]() val c = StreamTestKit.SubscriberProbe[String]()
p2.subscribe(c) p2.subscribe(c)
val s = c.expectSubscription() val s = c.expectSubscription()
@ -159,7 +159,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"invoke cleanup when done" in { "invoke cleanup when done" in {
val cleanupProbe = TestProbe() val cleanupProbe = TestProbe()
val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() val p = FlowFrom(List("a")).toPublisher()
val p2 = FlowFrom(p). val p2 = FlowFrom(p).
transform("transform", () new Transformer[String, String] { transform("transform", () new Transformer[String, String] {
var s = "" var s = ""
@ -170,7 +170,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
override def onTermination(e: Option[Throwable]) = List(s + "B") override def onTermination(e: Option[Throwable]) = List(s + "B")
override def cleanup() = cleanupProbe.ref ! s override def cleanup() = cleanupProbe.ref ! s
}). }).
withOutput(PublisherOut()).toPublisher() toPublisher()
val c = StreamTestKit.SubscriberProbe[String]() val c = StreamTestKit.SubscriberProbe[String]()
p2.subscribe(c) p2.subscribe(c)
val s = c.expectSubscription() val s = c.expectSubscription()
@ -182,7 +182,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"invoke cleanup when done consume" in { "invoke cleanup when done consume" in {
val cleanupProbe = TestProbe() val cleanupProbe = TestProbe()
val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() val p = FlowFrom(List("a")).toPublisher()
FlowFrom(p). FlowFrom(p).
transform("transform", () new Transformer[String, String] { transform("transform", () new Transformer[String, String] {
var s = "x" var s = "x"
@ -192,13 +192,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
} }
override def cleanup() = cleanupProbe.ref ! s override def cleanup() = cleanupProbe.ref ! s
}). }).
withOutput(BlackholeOut()).run() withSink(BlackholeSink()).run()
cleanupProbe.expectMsg("a") cleanupProbe.expectMsg("a")
} }
"invoke cleanup when done after error" in { "invoke cleanup when done after error" in {
val cleanupProbe = TestProbe() val cleanupProbe = TestProbe()
val p = FlowFrom(List("a", "b", "c")).withOutput(PublisherOut()).toPublisher() val p = FlowFrom(List("a", "b", "c")).toPublisher()
val p2 = FlowFrom(p). val p2 = FlowFrom(p).
transform("transform", () new Transformer[String, String] { transform("transform", () new Transformer[String, String] {
var s = "" var s = ""
@ -214,7 +214,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
override def onTermination(e: Option[Throwable]) = List(s + "B") override def onTermination(e: Option[Throwable]) = List(s + "B")
override def cleanup() = cleanupProbe.ref ! s override def cleanup() = cleanupProbe.ref ! s
}). }).
withOutput(PublisherOut()).toPublisher() toPublisher()
val c = StreamTestKit.SubscriberProbe[String]() val c = StreamTestKit.SubscriberProbe[String]()
p2.subscribe(c) p2.subscribe(c)
val s = c.expectSubscription() val s = c.expectSubscription()
@ -236,7 +236,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
} }
override def isComplete = s == "1" override def isComplete = s == "1"
}). }).
withOutput(PublisherOut()).toPublisher() toPublisher()
val proc = p.expectSubscription val proc = p.expectSubscription
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(c) p2.subscribe(c)
@ -263,7 +263,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
override def onTermination(e: Option[Throwable]) = List(s.length + 10) override def onTermination(e: Option[Throwable]) = List(s.length + 10)
override def cleanup() = cleanupProbe.ref ! s override def cleanup() = cleanupProbe.ref ! s
}). }).
withOutput(PublisherOut()).toPublisher() toPublisher()
val proc = p.expectSubscription val proc = p.expectSubscription
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(c) p2.subscribe(c)
@ -279,7 +279,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
} }
"report error when exception is thrown" in { "report error when exception is thrown" in {
val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() val p = FlowFrom(List(1, 2, 3)).toPublisher()
val p2 = FlowFrom(p). val p2 = FlowFrom(p).
transform("transform", () new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = { override def onNext(elem: Int) = {
@ -290,7 +290,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
} }
} }
}). }).
withOutput(PublisherOut()).toPublisher() toPublisher()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber) p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
@ -304,12 +304,12 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
} }
"support cancel as expected" in { "support cancel as expected" in {
val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() val p = FlowFrom(List(1, 2, 3)).toPublisher()
val p2 = FlowFrom(p). val p2 = FlowFrom(p).
transform("transform", () new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = List(elem, elem) override def onNext(elem: Int) = List(elem, elem)
}). }).
withOutput(PublisherOut()).toPublisher() toPublisher()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber) p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
@ -323,13 +323,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
} }
"support producing elements from empty inputs" in { "support producing elements from empty inputs" in {
val p = FlowFrom(List.empty[Int]).withOutput(PublisherOut()).toPublisher() val p = FlowFrom(List.empty[Int]).toPublisher()
val p2 = FlowFrom(p). val p2 = FlowFrom(p).
transform("transform", () new Transformer[Int, Int] { transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = Nil override def onNext(elem: Int) = Nil
override def onTermination(e: Option[Throwable]) = List(1, 2, 3) override def onTermination(e: Option[Throwable]) = List(1, 2, 3)
}). }).
withOutput(PublisherOut()).toPublisher() toPublisher()
val subscriber = StreamTestKit.SubscriberProbe[Int]() val subscriber = StreamTestKit.SubscriberProbe[Int]()
p2.subscribe(subscriber) p2.subscribe(subscriber)
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
@ -363,7 +363,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
case _ Nil case _ Nil
} }
} }
}).withOutput(PublisherOut()).produceTo(subscriber) }).publishTo(subscriber)
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
subscription.request(10) subscription.request(10)
@ -383,16 +383,16 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
count += 1 count += 1
List(count) List(count)
} }
}).withOutput(PublisherOut()) })
val s1 = StreamTestKit.SubscriberProbe[Int]() val s1 = StreamTestKit.SubscriberProbe[Int]()
flow.produceTo(s1) flow.publishTo(s1)
s1.expectSubscription().request(3) s1.expectSubscription().request(3)
s1.expectNext(1, 2, 3) s1.expectNext(1, 2, 3)
s1.expectComplete() s1.expectComplete()
val s2 = StreamTestKit.SubscriberProbe[Int]() val s2 = StreamTestKit.SubscriberProbe[Int]()
flow.produceTo(s2) flow.publishTo(s2)
s2.expectSubscription().request(3) s2.expectSubscription().request(3)
s2.expectNext(1, 2, 3) s2.expectNext(1, 2, 3)
s2.expectComplete() s2.expectComplete()