diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 5272ca50d8..e877a89598 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -14,8 +14,7 @@ object FlowMaterializer { * Scala API: Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) - * will be used to create these actors, therefore it is *forbidden* to pass this object - * to another actor if the factory is an ActorContext. + * will be used to create one actor that in turn creates actors for the transformation steps. * * The materializer's [[akka.stream.MaterializerSettings]] will be obtained from the * configuration of the `context`'s underlying [[akka.actor.ActorSystem]]. @@ -83,8 +82,7 @@ object FlowMaterializer { * Java API: Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) - * will be used to create these actors, therefore it is *forbidden* to pass this object - * to another actor if the factory is an ActorContext. + * will be used to create one actor that in turn creates actors for the transformation steps. */ def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer = apply(Option(settings), None)(context) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 50437f10f8..cd94b89ffa 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -76,13 +76,13 @@ private[akka] object Ast { final case class IteratorPublisherNode[I](iterator: Iterator[I]) extends PublisherNode[I] { final def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = - if (iterator.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]] + if (iterator.isEmpty) EmptyPublisher[I] else ActorPublisher[I](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings), name = s"$flowName-0-iterator")) } final case class IterablePublisherNode[I](iterable: immutable.Iterable[I]) extends PublisherNode[I] { def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = - if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]] + if (iterable.isEmpty) EmptyPublisher[I] else ActorPublisher[I](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings), name = s"$flowName-0-iterable"), Some(iterable)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 97b4fc8586..14a9f7c66f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -10,6 +10,7 @@ import org.reactivestreams.{ Subscriber, Publisher } */ private[akka] case object EmptyPublisher extends Publisher[Nothing] { def subscribe(subscriber: Subscriber[Nothing]): Unit = subscriber.onComplete() + def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] } /** @@ -17,4 +18,5 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] { */ private[akka] case class ErrorPublisher(t: Throwable) extends Publisher[Nothing] { def subscribe(subscriber: Subscriber[Nothing]): Unit = subscriber.onError(t) + def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala b/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala index f36038153e..1de4a968ba 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala @@ -14,7 +14,7 @@ import scala.util.control.NonFatal */ private[akka] object SynchronousPublisherFromIterable { def apply[T](iterable: immutable.Iterable[T]): Publisher[T] = - if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[T]] + if (iterable.isEmpty) EmptyPublisher[T] else new SynchronousPublisherFromIterable(iterable) private class IteratorSubscription[T](subscriber: Subscriber[T], iterator: Iterator[T]) extends Subscription { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala index 81c24b131f..6eb6390d57 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2013 Typesafe Inc. + * Copyright (C) 2009-2014 Typesafe Inc. */ package akka.stream.impl diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index a6024b9f51..5264fc1ca5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -4,23 +4,18 @@ package akka.stream.impl2 import java.util.concurrent.atomic.AtomicLong -import akka.actor.{ Actor, ActorCell, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, LocalActorRef, Props, RepointableActorRef } -import akka.pattern.ask -import org.reactivestreams.{ Processor, Publisher, Subscriber } + import scala.annotation.tailrec import scala.collection.immutable -import scala.concurrent.{ Await, Future } -import scala.concurrent.duration._ -import scala.util.{ Failure, Success } -import akka.stream.Transformer -import akka.stream.scaladsl2.FlowMaterializer -import akka.stream.MaterializerSettings -import akka.stream.impl.EmptyPublisher -import akka.stream.impl.ActorPublisher -import akka.stream.impl.IterablePublisher -import akka.stream.impl.TransformProcessorImpl -import akka.stream.impl.ActorProcessor -import akka.stream.impl.ExposedPublisher +import scala.concurrent.Await + +import org.reactivestreams.{ Processor, Publisher, Subscriber } + +import akka.actor._ +import akka.pattern.ask +import akka.stream.{ MaterializerSettings, Transformer } +import akka.stream.impl.{ ActorProcessor, ActorPublisher, ExposedPublisher, TransformProcessorImpl } +import akka.stream.scaladsl2._ /** * INTERNAL API @@ -32,32 +27,17 @@ private[akka] object Ast { case class Transform(name: String, mkTransformer: () ⇒ Transformer[Any, Any]) extends AstNode - trait PublisherNode[I] { - private[akka] def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] - } - - final case class ExistingPublisher[I](publisher: Publisher[I]) extends PublisherNode[I] { - def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String) = publisher - } - - final case class IterablePublisherNode[I](iterable: immutable.Iterable[I]) extends PublisherNode[I] { - def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = - if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]] - else ActorPublisher[I](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings), - name = s"$flowName-0-iterable"), Some(iterable)) - } - } /** * INTERNAL API */ -private[akka] case class ActorBasedFlowMaterializer( - override val settings: MaterializerSettings, - supervisor: ActorRef, - flowNameCounter: AtomicLong, - namePrefix: String) +case class ActorBasedFlowMaterializer(override val settings: MaterializerSettings, + supervisor: ActorRef, + flowNameCounter: AtomicLong, + namePrefix: String) extends FlowMaterializer(settings) { + import akka.stream.impl2.Ast._ def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name) @@ -78,16 +58,57 @@ private[akka] case class ActorBasedFlowMaterializer( } // Ops come in reverse order - override def toPublisher[I, O](publisherNode: PublisherNode[I], ops: List[AstNode]): Publisher[O] = { + override def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow = { val flowName = createFlowName() - if (ops.isEmpty) publisherNode.createPublisher(this, flowName).asInstanceOf[Publisher[O]] - else { - val opsSize = ops.size - val opProcessor = processorForNode(ops.head, flowName, opsSize) - val topSubscriber = processorChain(opProcessor, ops.tail, flowName, opsSize - 1) - publisherNode.createPublisher(this, flowName).subscribe(topSubscriber.asInstanceOf[Subscriber[I]]) - opProcessor.asInstanceOf[Publisher[O]] + + def attachSink(pub: Publisher[Out]) = sink match { + case s: SimpleSink[Out] ⇒ s.attach(pub, this, flowName) + case s: SinkWithKey[Out, _] ⇒ s.attach(pub, this, flowName) + case _ ⇒ throw new MaterializationException("unknown Sink type " + sink.getClass) } + def attachSource(sub: Subscriber[In]) = source match { + case s: SimpleSource[In] ⇒ s.attach(sub, this, flowName) + case s: SourceWithKey[In, _] ⇒ s.attach(sub, this, flowName) + case _ ⇒ throw new MaterializationException("unknown Source type " + sink.getClass) + } + def createSink() = sink.asInstanceOf[Sink[In]] match { + case s: SimpleSink[In] ⇒ s.create(this, flowName) -> (()) + case s: SinkWithKey[In, _] ⇒ s.create(this, flowName) + case _ ⇒ throw new MaterializationException("unknown Sink type " + sink.getClass) + } + def createSource() = source.asInstanceOf[Source[Out]] match { + case s: SimpleSource[Out] ⇒ s.create(this, flowName) -> (()) + case s: SourceWithKey[Out, _] ⇒ s.create(this, flowName) + case _ ⇒ throw new MaterializationException("unknown Source type " + sink.getClass) + } + def isActive(s: AnyRef) = s match { + case source: SimpleSource[_] ⇒ source.isActive + case source: SourceWithKey[_, _] ⇒ source.isActive + case sink: SimpleSink[_] ⇒ sink.isActive + case sink: SinkWithKey[_, _] ⇒ sink.isActive + case _: Source[_] ⇒ throw new MaterializationException("unknown Source type " + sink.getClass) + case _: Sink[_] ⇒ throw new MaterializationException("unknown Sink type " + sink.getClass) + } + + val (sourceValue, sinkValue) = + if (ops.isEmpty) { + if (isActive(sink)) { + val (sub, value) = createSink() + (attachSource(sub), value) + } else if (isActive(source)) { + val (pub, value) = createSource() + (value, attachSink(pub)) + } else { + val id: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]] + (attachSource(id), attachSink(id)) + } + } else { + val opsSize = ops.size + val last = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[Any, Out]] + val first = processorChain(last, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Any]] + (attachSource(first), attachSink(last)) + } + new MaterializedFlow(source, sourceValue, sink, sinkValue) } private val identityTransform = Transform("identity", () ⇒ @@ -95,7 +116,7 @@ private[akka] case class ActorBasedFlowMaterializer( override def onNext(element: Any) = List(element) }) - def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { + private def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { val impl = actorOf(ActorProcessorFactory.props(settings, op), s"$flowName-$n-${op.name}") ActorProcessorFactory(impl) } @@ -168,4 +189,4 @@ private[akka] object ActorProcessorFactory { impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]]) p } -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index e6e4a90208..23bdd9167c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -6,122 +6,47 @@ package akka.stream.scaladsl2 import scala.language.higherKinds import scala.collection.immutable import scala.concurrent.Future -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber -import akka.stream.Transformer +import akka.stream._ import akka.stream.impl.BlackholeSubscriber import akka.stream.impl2.Ast._ +import scala.annotation.unchecked.uncheckedVariance +import akka.stream.impl.BlackholeSubscriber +import scala.concurrent.Promise +import akka.stream.impl.EmptyPublisher +import akka.stream.impl.IterablePublisher +import akka.stream.impl2.ActorBasedFlowMaterializer +import org.reactivestreams._ +import scala.concurrent.duration.FiniteDuration +import scala.util.Try +import scala.util.Failure +import scala.util.Success +/** + * This is the interface from which all concrete Flows inherit. No generic + * operations are presented because the concrete type of Flow (i.e. whether + * it has a [[Source]] or a [[Sink]]) determines what is available. + */ sealed trait Flow -object FlowFrom { - /** - * Helper to create `Flow` without [[Input]]. - * Example usage: `FlowFrom[Int]` - */ - def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil) - - /** - * Helper to create `Flow` with Input from `Iterable`. - * Example usage: `FlowFrom(Seq(1,2,3))` - */ - def apply[T](i: immutable.Iterable[T]): PublisherFlow[T, T] = FlowFrom[T].withInput(IterableIn(i)) - - /** - * Helper to create `Flow` with [[Input]] from `Publisher`. - */ - def apply[T](p: Publisher[T]): PublisherFlow[T, T] = FlowFrom[T].withInput(PublisherIn(p)) -} - -trait Input[-In] +/** + * Marker interface for flows that have a free (attachable) input side. + */ +sealed trait HasNoSource[-In] extends Flow /** - * Default input. - * Allows to materialize a Flow with this input to Subscriber. + * Marker interface for flows that have a free (attachable) output side. */ -final case class SubscriberIn[-In]() extends Input[In] { - def subscriber[I <: In]: Subscriber[I] = ??? -} +sealed trait HasNoSink[+Out] extends Flow /** - * Input from Publisher. + * Operations offered by flows with a free output side: the DSL flows left-to-right only. */ -final case class PublisherIn[-In](p: Publisher[_ >: In]) extends Input[In] - -/** - * Input from Iterable - * - * Changing In from Contravariant to Covariant is needed because Iterable[+A]. - * But this brakes IterableIn variance and we get IterableIn(Seq(1,2,3)): IterableIn[Any] - */ -final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[In] - -/** - * Input from Future - * - * Changing In from Contravariant to Covariant is needed because Future[+A]. - * But this brakes FutureIn variance and we get FutureIn(Future{1}): FutureIn[Any] - */ -final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In] - -trait Output[+Out] - -/** - * Default output. - * Allows to materialize a Flow with this output to Publisher. - */ -final case class PublisherOut[+Out]() extends Output[Out] { - def publisher[O >: Out]: Publisher[O] = ??? -} - -final case class BlackholeOut[+Out]() extends Output[Out] { - def publisher[O >: Out]: Publisher[O] = ??? -} - -/** - * Output to a Subscriber. - */ -final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out] - -/** - * Fold output. Reduces output stream according to the given fold function. - */ -final case class FoldOut[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Output[Out] { - def future: Future[T] = ??? -} - -/** - * Operations with a Flow which has open (no attached) Input. - * - * No Out type parameter would be useful for Graph signatures, but we need it here - * for `withInput` and `prependTransform` methods. - */ -sealed trait HasOpenInput[-In, +Out] extends Flow { - type Repr[-In, +Out] <: HasOpenInput[In, Out] - type AfterCloseInput[-In, +Out] <: Flow - - def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] - - def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] - def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] - -} - -/** - * Operations with a Flow which has open (no attached) Output. - * - * No In type parameter would be useful for Graph signatures, but we need it here - * for `withOutput`. - */ -trait HasOpenOutput[-In, +Out] extends Flow { - type Repr[-In, +Out] <: HasOpenOutput[In, Out] - type AfterCloseOutput[-In, +Out] <: Flow +trait FlowOps[-In, +Out] extends HasNoSink[Out] { + type Repr[-I, +O] <: FlowOps[I, O] // Storing ops in reverse order protected def andThen[U](op: AstNode): Repr[In, U] - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] - def map[T](f: Out ⇒ T): Repr[In, T] = transform("map", () ⇒ new Transformer[Out, T] { override def onNext(in: Out) = List(f(in)) @@ -130,89 +55,98 @@ trait HasOpenOutput[-In, +Out] extends Flow { def transform[T](name: String, mkTransformer: () ⇒ Transformer[Out, T]): Repr[In, T] = { andThen(Transform(name, mkTransformer.asInstanceOf[() ⇒ Transformer[Any, Any]])) } - - def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] - def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] } /** * Flow without attached input and without attached output, can be used as a `Processor`. */ -final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends HasOpenOutput[In, Out] with HasOpenInput[In, Out] { - override type Repr[-In, +Out] = ProcessorFlow[In, Out] - type AfterCloseOutput[-In, +Out] = SubscriberFlow[In, Out] - type AfterCloseInput[-In, +Out] = PublisherFlow[In, Out] +final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends FlowOps[In, Out] with HasNoSource[In] { + override type Repr[-I, +O] = ProcessorFlow[I, O] override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = SubscriberFlow(out, ops) - def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = PublisherFlow(in, ops) + def withSink(out: Sink[Out]): FlowWithSink[In, Out] = FlowWithSink(out, ops) + def withSource(in: Source[In]): FlowWithSource[In, Out] = FlowWithSource(in, ops) - override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = - ProcessorFlow(ops ::: f.ops) - override def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = - PublisherFlow(f.input, ops ::: f.ops) + def prepend[T](f: ProcessorFlow[T, In]): ProcessorFlow[T, Out] = ProcessorFlow(ops ::: f.ops) + def prepend[T](f: FlowWithSource[T, In]): FlowWithSource[T, Out] = f.append(this) - override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = ProcessorFlow(f.ops ++: ops) - override def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = - SubscriberFlow(f.output, f.ops ++: ops) + def append[T](f: ProcessorFlow[Out, T]): ProcessorFlow[In, T] = ProcessorFlow(f.ops ++: ops) + def append[T](f: FlowWithSink[Out, T]): FlowWithSink[In, T] = f.prepend(this) } /** * Flow with attached output, can be used as a `Subscriber`. */ -final case class SubscriberFlow[-In, +Out](output: Output[Out], ops: List[AstNode]) extends HasOpenInput[In, Out] { - type Repr[-In, +Out] = SubscriberFlow[In, Out] - type AfterCloseInput[-In, +Out] = RunnableFlow[In, Out] +final case class FlowWithSink[-In, +Out](private[scaladsl2] val output: Sink[Out @uncheckedVariance], ops: List[AstNode]) extends HasNoSource[In] { - def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = RunnableFlow(in, output, ops) - def withoutOutput: ProcessorFlow[In, Out] = ProcessorFlow(ops) + def withSource(in: Source[In]): RunnableFlow[In, Out] = RunnableFlow(in, output, ops) + def withoutSink: ProcessorFlow[In, Out] = ProcessorFlow(ops) - override def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] = - SubscriberFlow(output, ops ::: f.ops) - override def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] = - RunnableFlow(f.input, output, ops ::: f.ops) + def prepend[T](f: ProcessorFlow[T, In]): FlowWithSink[T, Out] = FlowWithSink(output, ops ::: f.ops) + def prepend[T](f: FlowWithSource[T, In]): RunnableFlow[T, Out] = RunnableFlow(f.input, output, ops ::: f.ops) + + def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] = { + val subIn = SubscriberSource[In]() + val mf = withSource(subIn).run() + subIn.subscriber(mf) + } } /** * Flow with attached input, can be used as a `Publisher`. */ -final case class PublisherFlow[-In, +Out](input: Input[In], ops: List[AstNode]) extends HasOpenOutput[In, Out] { - override type Repr[-In, +Out] = PublisherFlow[In, Out] - type AfterCloseOutput[-In, +Out] = RunnableFlow[In, Out] +final case class FlowWithSource[-In, +Out](private[scaladsl2] val input: Source[In @uncheckedVariance], ops: List[AstNode]) extends FlowOps[In, Out] { + override type Repr[-I, +O] = FlowWithSource[I, O] override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) - def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = RunnableFlow(input, out, ops) - def withoutInput: ProcessorFlow[In, Out] = ProcessorFlow(ops) + def withSink(out: Sink[Out]): RunnableFlow[In, Out] = RunnableFlow(input, out, ops) + def withoutSource: ProcessorFlow[In, Out] = ProcessorFlow(ops) - override def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] = PublisherFlow(input, f.ops ++: ops) - override def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] = - RunnableFlow(input, f.output, f.ops ++: ops) + def append[T](f: ProcessorFlow[Out, T]): FlowWithSource[In, T] = FlowWithSource(input, f.ops ++: ops) + def append[T](f: FlowWithSink[Out, T]): RunnableFlow[In, T] = RunnableFlow(input, f.output, f.ops ++: ops) + + def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = { + val pubOut = PublisherSink[Out] + val mf = withSink(pubOut).run() + pubOut.publisher(mf) + } + + def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer): Unit = + toPublisher().subscribe(subscriber) + + def consume()(implicit materializer: FlowMaterializer): Unit = + withSink(BlackholeSink).run() } /** * Flow with attached input and output, can be executed. */ -final case class RunnableFlow[-In, +Out](input: Input[In], output: Output[Out], ops: List[AstNode]) extends Flow { - def withoutOutput: PublisherFlow[In, Out] = PublisherFlow(input, ops) - def withoutInput: SubscriberFlow[In, Out] = SubscriberFlow(output, ops) - - // FIXME - def run()(implicit materializer: FlowMaterializer): Unit = - produceTo(new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize)) - - // FIXME replace with run and input/output factories - def toPublisher[U >: Out]()(implicit materializer: FlowMaterializer): Publisher[U] = - input match { - case PublisherIn(p) ⇒ materializer.toPublisher(ExistingPublisher(p), ops) - case IterableIn(iter) ⇒ materializer.toPublisher(IterablePublisherNode(iter), ops) - case _ ⇒ ??? - } - - def produceTo(subscriber: Subscriber[_ >: Out])(implicit materializer: FlowMaterializer): Unit = - toPublisher().subscribe(subscriber.asInstanceOf[Subscriber[Out]]) +final case class RunnableFlow[-In, +Out](private[scaladsl2] val input: Source[In @uncheckedVariance], + private[scaladsl2] val output: Sink[Out @uncheckedVariance], ops: List[AstNode]) extends Flow { + def withoutSink: FlowWithSource[In, Out] = FlowWithSource(input, ops) + def withoutSource: FlowWithSink[In, Out] = FlowWithSink(output, ops) + def run()(implicit materializer: FlowMaterializer): MaterializedFlow = + materializer.materialize(input, output, ops) } +class MaterializedFlow(sourceKey: AnyRef, matSource: Any, sinkKey: AnyRef, matSink: Any) extends MaterializedSource with MaterializedSink { + override def getSourceFor[T](key: SourceWithKey[_, T]): T = + if (key == sourceKey) matSource.asInstanceOf[T] + else throw new IllegalArgumentException(s"Source key [$key] doesn't match the source [$sourceKey] of this flow") + + def getSinkFor[T](key: SinkWithKey[_, T]): T = + if (key == sinkKey) matSink.asInstanceOf[T] + else throw new IllegalArgumentException(s"Sink key [$key] doesn't match the sink [$sinkKey] of this flow") +} + +trait MaterializedSource { + def getSourceFor[T](sourceKey: SourceWithKey[_, T]): T +} + +trait MaterializedSink { + def getSinkFor[T](sinkKey: SinkWithKey[_, T]): T +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala index 8bb91e81d5..f444f0ed94 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala @@ -3,21 +3,32 @@ */ package akka.stream.scaladsl2 -import scala.concurrent.duration.FiniteDuration -import akka.actor.ActorRefFactory -import akka.stream.impl2.ActorBasedFlowMaterializer -import akka.stream.impl2.Ast -import org.reactivestreams.{ Publisher, Subscriber } -import scala.concurrent.duration._ -import akka.actor.Deploy -import akka.actor.ExtendedActorSystem -import akka.actor.ActorContext -import akka.stream.impl2.StreamSupervisor -import akka.stream.impl2.FlowNameCounter +import akka.actor.{ ActorContext, ActorRefFactory, ActorSystem, ExtendedActorSystem } import akka.stream.MaterializerSettings +import akka.stream.impl2.{ ActorBasedFlowMaterializer, Ast, FlowNameCounter, StreamSupervisor } object FlowMaterializer { + /** + * Scala API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create one actor that in turn creates actors for the transformation steps. + * + * The materializer's [[akka.stream.MaterializerSettings]] will be obtained from the + * configuration of the `context`'s underlying [[akka.actor.ActorSystem]]. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def apply(materializerSettings: Option[MaterializerSettings] = None, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = { + val system = actorSystemOf(context) + + val settings = materializerSettings getOrElse MaterializerSettings(system) + apply(settings, namePrefix.getOrElse("flow"))(context) + } + /** * Scala API: Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] @@ -29,31 +40,77 @@ object FlowMaterializer { * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of * `namePrefix-flowNumber-flowStepNumber-stepName`. */ - def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = { - val system = context match { - case s: ExtendedActorSystem ⇒ s - case c: ActorContext ⇒ c.system - case null ⇒ throw new IllegalArgumentException("ActorRefFactory context must be defined") - case _ ⇒ throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, " + - "got [${_contex.getClass.getName}]") - } + def apply(materializerSettings: MaterializerSettings, namePrefix: String)(implicit context: ActorRefFactory): FlowMaterializer = { + val system = actorSystemOf(context) new ActorBasedFlowMaterializer( - settings, - context.actorOf(StreamSupervisor.props(settings).withDispatcher(settings.dispatcher)), + materializerSettings, + context.actorOf(StreamSupervisor.props(materializerSettings).withDispatcher(materializerSettings.dispatcher)), FlowNameCounter(system).counter, - namePrefix.getOrElse("flow")) + namePrefix) } + /** + * Scala API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def apply(materializerSettings: MaterializerSettings)(implicit context: ActorRefFactory): FlowMaterializer = + apply(Some(materializerSettings), None) + /** * Java API: Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * will be used to create these actors, therefore it is *forbidden* to pass this object * to another actor if the factory is an ActorContext. + * + * Defaults the actor name prefix used to name actors running the processing steps to `"flow"`. + * The actor names are built up of `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def create(context: ActorRefFactory): FlowMaterializer = + apply()(context) + + /** + * Java API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create one actor that in turn creates actors for the transformation steps. */ def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer = - apply(settings)(context) + apply(Option(settings), None)(context) + + /** + * Java API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def create(settings: MaterializerSettings, context: ActorRefFactory, namePrefix: String): FlowMaterializer = + apply(Option(settings), Option(namePrefix))(context) + + private def actorSystemOf(context: ActorRefFactory): ActorSystem = { + val system = context match { + case s: ExtendedActorSystem ⇒ s + case c: ActorContext ⇒ c.system + case null ⇒ throw new IllegalArgumentException("ActorRefFactory context must be defined") + case _ ⇒ + throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, got [${context.getClass.getName}]") + } + system + } + } /** @@ -66,16 +123,25 @@ object FlowMaterializer { abstract class FlowMaterializer(val settings: MaterializerSettings) { /** - * The `namePrefix` is used as the first part of the names of the actors running - * the processing steps. + * The `namePrefix` shall be used for deriving the names of processing + * entities that are created during materialization. This is meant to aid + * logging and error reporting both during materialization and while the + * stream is running. */ def withNamePrefix(name: String): FlowMaterializer /** - * INTERNAL API - * ops are stored in reverse order + * This method interprets the given Flow description and creates the running + * stream. The result can be highly implementation specific, ranging from + * local actor chains to remote-deployed processing networks. */ - private[akka] def toPublisher[I, O](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Publisher[O] + def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow } +/** + * This exception or subtypes thereof should be used to signal materialization + * failures. + */ +class MaterializationException(msg: String, cause: Throwable = null) extends RuntimeException(msg, cause) + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala new file mode 100644 index 0000000000..81ac1cf31f --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala @@ -0,0 +1,218 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.annotation.unchecked.uncheckedVariance +import scala.concurrent.{ Future, Promise } +import scala.util.{ Failure, Success, Try } +import org.reactivestreams.{ Publisher, Subscriber, Subscription } +import akka.stream.Transformer +import akka.stream.impl.BlackholeSubscriber +import akka.stream.impl2.ActorBasedFlowMaterializer +import java.util.concurrent.atomic.AtomicReference + +/** + * This trait is a marker for a pluggable stream sink. Concrete instances should + * implement [[SinkWithKey]] or [[SimpleSink]], otherwise a custom [[FlowMaterializer]] + * will have to be used to be able to attach them. + * + * All Sinks defined in this package rely upon an [[ActorBasedFlowMaterializer]] being + * made available to them in order to use the attach method. Other + * FlowMaterializers can be used but must then implement the functionality of these + * Sink nodes themselves (or construct an ActorBasedFlowMaterializer). + */ +trait Sink[-Out] + +/** + * A sink that does not need to create a user-accessible object during materialization. + */ +trait SimpleSink[-Out] extends Sink[Out] { + /** + * Attach this sink to the given [[org.reactivestreams.Publisher]]. Using the given + * [[FlowMaterializer]] is completely optional, especially if this sink belongs to + * a different Reactive Streams implementation. It is the responsibility of the + * caller to provide a suitable FlowMaterializer that can be used for running + * Flows if necessary. + * + * @param flowPublisher the Publisher to consume elements from + * @param materializer a FlowMaterializer that may be used for creating flows + * @param flowName the name of the current flow, which should be used in log statements or error messages + */ + def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): Unit + /** + * This method is only used for Sinks that return true from [[#isActive]], which then must + * implement it. + */ + def create(materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] @uncheckedVariance = + throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") + /** + * This method indicates whether this Sink can create a Subscriber instead of being + * attached to a Publisher. This is only used if the Flow does not contain any + * operations. + */ + def isActive: Boolean = false +} + +/** + * A sink that will create an object during materialization that the user will need + * to retrieve in order to access aspects of this sink (could be a completion Future + * or a cancellation handle, etc.) + */ +trait SinkWithKey[-Out, T] extends Sink[Out] { + /** + * Attach this sink to the given [[org.reactivestreams.Publisher]]. Using the given + * [[FlowMaterializer]] is completely optional, especially if this sink belongs to + * a different Reactive Streams implementation. It is the responsibility of the + * caller to provide a suitable FlowMaterializer that can be used for running + * Flows if necessary. + * + * @param flowPublisher the Publisher to consume elements from + * @param materializer a FlowMaterializer that may be used for creating flows + * @param flowName the name of the current flow, which should be used in log statements or error messages + */ + def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): T + /** + * This method is only used for Sinks that return true from [[#isActive]], which then must + * implement it. + */ + def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Subscriber[Out] @uncheckedVariance, T) = + throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") + /** + * This method indicates whether this Sink can create a Subscriber instead of being + * attached to a Publisher. This is only used if the Flow does not contain any + * operations. + */ + def isActive: Boolean = false + + // these are unique keys, case class equality would break them + final override def equals(other: Any): Boolean = super.equals(other) + final override def hashCode: Int = super.hashCode +} + +/** + * Holds the downstream-most [[org.reactivestreams.Publisher]] interface of the materialized flow. + * The stream will not have any subscribers attached at this point, which means that after prefetching + * elements to fill the internal buffers it will assert back-pressure until + * a subscriber connects and creates demand for elements to be emitted. + */ +object PublisherSink { + private val instance = new PublisherSink[Nothing] + def apply[T]: PublisherSink[T] = instance.asInstanceOf[PublisherSink[T]] +} + +class PublisherSink[Out]() extends SinkWithKey[Out, Publisher[Out]] { + def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Out] = flowPublisher + def publisher(m: MaterializedSink): Publisher[Out] = m.getSinkFor(this) + + override def toString: String = "PublisherSink" +} + +/** + * Holds a [[scala.concurrent.Future]] that will be fulfilled with the first + * thing that is signaled to this stream, which can be either an element (after + * which the upstream subscription is canceled), an error condition (putting + * the Future into the corresponding failed state) or the end-of-stream + * (failing the Future with a NoSuchElementException). + */ +object FutureSink { + private val instance = new FutureSink[Nothing] + def apply[T]: FutureSink[T] = instance.asInstanceOf[FutureSink[T]] +} + +class FutureSink[Out] extends SinkWithKey[Out, Future[Out]] { + def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Future[Out] = { + val (sub, f) = create(materializer, flowName) + flowPublisher.subscribe(sub) + f + } + override def isActive = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Subscriber[Out], Future[Out]) = { + val p = Promise[Out]() + val sub = new Subscriber[Out] { // TODO #15804 verify this using the RS TCK + private val sub = new AtomicReference[Subscription] + override def onSubscribe(s: Subscription): Unit = + if (!sub.compareAndSet(null, s)) s.cancel() + else s.request(1) + override def onNext(t: Out): Unit = { p.trySuccess(t); sub.get.cancel() } + override def onError(t: Throwable): Unit = p.tryFailure(t) + override def onComplete(): Unit = p.tryFailure(new NoSuchElementException("empty stream")) + } + (sub, p.future) + } + + def future(m: MaterializedSink): Future[Out] = m.getSinkFor(this) + + override def toString: String = "FutureSink" +} + +/** + * Attaches a subscriber to this stream which will just discard all received + * elements. + */ +final case object BlackholeSink extends SimpleSink[Any] { + override def attach(flowPublisher: Publisher[Any], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + flowPublisher.subscribe(create(materializer, flowName)) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Any] = + new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize) +} + +/** + * Attaches a subscriber to this stream. + */ +final case class SubscriberSink[Out](subscriber: Subscriber[Out]) extends SimpleSink[Out] { + override def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + flowPublisher.subscribe(subscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] = subscriber +} + +object OnCompleteSink { + private val SuccessUnit = Success[Unit](()) +} + +/** + * When the flow is completed, either through an error or normal + * completion, apply the provided function with [[scala.util.Success]] + * or [[scala.util.Failure]]. + */ +final case class OnCompleteSink[Out](callback: Try[Unit] ⇒ Unit) extends SimpleSink[Out] { + override def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + FlowFrom(flowPublisher).transform("onCompleteSink", () ⇒ new Transformer[Out, Unit] { + override def onNext(in: Out) = Nil + override def onError(e: Throwable) = { + callback(Failure(e)) + throw e + } + override def onTermination(e: Option[Throwable]) = { + callback(OnCompleteSink.SuccessUnit) + Nil + } + }).consume()(materializer.withNamePrefix(flowName)) +} + +/** + * Invoke the given procedure for each received element. The sink holds a [[scala.concurrent.Future]] + * that will be completed with `Success` when reaching the normal end of the stream, or completed + * with `Failure` if there is an error is signaled in the stream. + */ +final case class ForeachSink[Out](f: Out ⇒ Unit) extends SinkWithKey[Out, Future[Unit]] { + override def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Future[Unit] = { + val promise = Promise[Unit]() + FlowFrom(flowPublisher).transform("foreach", () ⇒ new Transformer[Out, Unit] { + override def onNext(in: Out) = { f(in); Nil } + override def onError(cause: Throwable): Unit = () + override def onTermination(e: Option[Throwable]) = { + e match { + case None ⇒ promise.success(()) + case Some(e) ⇒ promise.failure(e) + } + Nil + } + }).consume()(materializer.withNamePrefix(flowName)) + promise.future + } + def future(m: MaterializedSink): Future[Unit] = m.getSinkFor(this) +} + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala new file mode 100644 index 0000000000..e3e5f3e1a0 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala @@ -0,0 +1,271 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.annotation.unchecked.uncheckedVariance +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.util.{ Failure, Success } + +import org.reactivestreams.{ Publisher, Subscriber } + +import akka.stream.impl.{ ActorPublisher, EmptyPublisher, ErrorPublisher, FuturePublisher, IterablePublisher, IteratorPublisher, SimpleCallbackPublisher, TickPublisher } +import akka.stream.impl2.ActorBasedFlowMaterializer + +object FlowFrom { + /** + * Helper to create `Flow` without [[Source]]. + * Example usage: `FlowFrom[Int]` + */ + def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil) + + /** + * Helper to create `Flow` with [[Source]] from `Publisher`. + * + * Construct a transformation starting with given publisher. The transformation steps + * are executed by a series of [[org.reactivestreams.Processor]] instances + * that mediate the flow of elements downstream and the propagation of + * back-pressure upstream. + */ + def apply[T](publisher: Publisher[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(PublisherSource(publisher)) + + /** + * Helper to create `Flow` with [[Source]] from `Iterator`. + * Example usage: `FlowFrom(Seq(1,2,3).iterator)` + * + * Start a new `Flow` from the given Iterator. The produced stream of elements + * will continue until the iterator runs empty or fails during evaluation of + * the `next()` method. Elements are pulled out of the iterator + * in accordance with the demand coming from the downstream transformation + * steps. + */ + def apply[T](iterator: Iterator[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IteratorSource(iterator)) + + /** + * Helper to create `Flow` with [[Source]] from `Iterable`. + * Example usage: `FlowFrom(Seq(1,2,3))` + * + * Starts a new `Flow` from the given `Iterable`. This is like starting from an + * Iterator, but every Subscriber directly attached to the Publisher of this + * stream will see an individual flow of elements (always starting from the + * beginning) regardless of when they subscribed. + */ + def apply[T](iterable: immutable.Iterable[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IterableSource(iterable)) + + /** + * Define the sequence of elements to be produced by the given closure. + * The stream ends normally when evaluation of the closure returns a `None`. + * The stream ends exceptionally when an exception is thrown from the closure. + */ + def apply[T](f: () ⇒ Option[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(ThunkSource(f)) + + /** + * Start a new `Flow` from the given `Future`. The stream will consist of + * one element when the `Future` is completed with a successful value, which + * may happen before or after materializing the `Flow`. + * The stream terminates with an error if the `Future` is completed with a failure. + */ + def apply[T](future: Future[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(FutureSource(future)) + + /** + * Elements are produced from the tick closure periodically with the specified interval. + * The tick element will be delivered to downstream consumers that has requested any elements. + * If a consumer has not requested any elements at the point in time when the tick + * element is produced it will not receive that tick element later. It will + * receive new tick elements as soon as it has requested more elements. + */ + def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): FlowWithSource[T, T] = + FlowFrom[T].withSource(TickSource(initialDelay, interval, tick)) + +} + +/** + * This trait is a marker for a pluggable stream source. Concrete instances should + * implement [[SourceWithKey]] or [[SimpleSource]], otherwise a custom [[FlowMaterializer]] + * will have to be used to be able to attach them. + * + * All Sources defined in this package rely upon an ActorBasedFlowMaterializer being + * made available to them in order to use the attach method. Other + * FlowMaterializers can be used but must then implement the functionality of these + * Source nodes themselves (or construct an ActorBasedFlowMaterializer). + */ +trait Source[+In] + +/** + * A source that does not need to create a user-accessible object during materialization. + */ +trait SimpleSource[+In] extends Source[In] { + /** + * Attach this source to the given [[org.reactivestreams.Subscriber]]. Using the given + * [[FlowMaterializer]] is completely optional, especially if this source belongs to + * a different Reactive Streams implementation. It is the responsibility of the + * caller to provide a suitable FlowMaterializer that can be used for running + * Flows if necessary. + * + * @param flowSubscriber the Subscriber to produce elements to + * @param materializer a FlowMaterializer that may be used for creating flows + * @param flowName the name of the current flow, which should be used in log statements or error messages + */ + def attach(flowSubscriber: Subscriber[In] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): Unit + /** + * This method is only used for Sources that return true from [[#isActive]], which then must + * implement it. + */ + def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] @uncheckedVariance = + throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") + /** + * This method indicates whether this Source can create a Publisher instead of being + * attached to a Subscriber. This is only used if the Flow does not contain any + * operations. + */ + def isActive: Boolean = false +} + +/** + * A source that will create an object during materialization that the user will need + * to retrieve in order to access aspects of this source (could be a Subscriber, a + * Future/Promise, etc.). + */ +trait SourceWithKey[+In, T] extends Source[In] { + /** + * Attach this source to the given [[org.reactivestreams.Subscriber]]. Using the given + * [[FlowMaterializer]] is completely optional, especially if this source belongs to + * a different Reactive Streams implementation. It is the responsibility of the + * caller to provide a suitable FlowMaterializer that can be used for running + * Flows if necessary. + * + * @param flowSubscriber the Subscriber to produce elements to + * @param materializer a FlowMaterializer that may be used for creating flows + * @param flowName the name of the current flow, which should be used in log statements or error messages + */ + def attach(flowSubscriber: Subscriber[In] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): T + /** + * This method is only used for Sources that return true from [[#isActive]], which then must + * implement it. + */ + def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[In] @uncheckedVariance, T) = + throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") + /** + * This method indicates whether this Source can create a Publisher instead of being + * attached to a Subscriber. This is only used if the Flow does not contain any + * operations. + */ + def isActive: Boolean = false + + // these are unique keys, case class equality would break them + final override def equals(other: Any): Boolean = super.equals(other) + final override def hashCode: Int = super.hashCode +} + +/** + * Holds a `Subscriber` representing the input side of the flow. + * The `Subscriber` can later be connected to an upstream `Publisher`. + */ +final case class SubscriberSource[In]() extends SourceWithKey[In, Subscriber[In]] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[In] = + flowSubscriber + + def subscriber(m: MaterializedSource): Subscriber[In] = m.getSourceFor(this) +} + +/** + * Construct a transformation starting with given publisher. The transformation steps + * are executed by a series of [[org.reactivestreams.Processor]] instances + * that mediate the flow of elements downstream and the propagation of + * back-pressure upstream. + */ +final case class PublisherSource[In](p: Publisher[In]) extends SimpleSource[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + p.subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = p +} + +/** + * Start a new `Flow` from the given Iterator. The produced stream of elements + * will continue until the iterator runs empty or fails during evaluation of + * the `next()` method. Elements are pulled out of the iterator + * in accordance with the demand coming from the downstream transformation + * steps. + */ +final case class IteratorSource[In](iterator: Iterator[In]) extends SimpleSource[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + create(materializer, flowName).subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = + if (iterator.isEmpty) EmptyPublisher[In] + else ActorPublisher[In](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings), + name = s"$flowName-0-iterator")) +} + +/** + * Starts a new `Flow` from the given `Iterable`. This is like starting from an + * Iterator, but every Subscriber directly attached to the Publisher of this + * stream will see an individual flow of elements (always starting from the + * beginning) regardless of when they subscribed. + */ +final case class IterableSource[In](iterable: immutable.Iterable[In]) extends SimpleSource[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + create(materializer, flowName).subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = + if (iterable.isEmpty) EmptyPublisher[In] + else ActorPublisher[In](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings), + name = s"$flowName-0-iterable"), Some(iterable)) +} + +/** + * Define the sequence of elements to be produced by the given closure. + * The stream ends normally when evaluation of the closure returns a `None`. + * The stream ends exceptionally when an exception is thrown from the closure. + */ +final case class ThunkSource[In](f: () ⇒ Option[In]) extends SimpleSource[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + create(materializer, flowName).subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = + ActorPublisher[In](materializer.actorOf(SimpleCallbackPublisher.props(materializer.settings, f), + name = s"$flowName-0-thunk")) +} + +/** + * Start a new `Flow` from the given `Future`. The stream will consist of + * one element when the `Future` is completed with a successful value, which + * may happen before or after materializing the `Flow`. + * The stream terminates with an error if the `Future` is completed with a failure. + */ +final case class FutureSource[In](future: Future[In]) extends SimpleSource[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + create(materializer, flowName).subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = + future.value match { + case Some(Success(element)) ⇒ + ActorPublisher[In](materializer.actorOf(IterablePublisher.props(List(element), materializer.settings), + name = s"$flowName-0-future"), Some(future)) + case Some(Failure(t)) ⇒ + ErrorPublisher(t).asInstanceOf[Publisher[In]] + case None ⇒ + ActorPublisher[In](materializer.actorOf(FuturePublisher.props(future, materializer.settings), + name = s"$flowName-0-future"), Some(future)) + } +} + +/** + * Elements are produced from the tick closure periodically with the specified interval. + * The tick element will be delivered to downstream consumers that has requested any elements. + * If a consumer has not requested any elements at the point in time when the tick + * element is produced it will not receive that tick element later. It will + * receive new tick elements as soon as it has requested more elements. + */ +final case class TickSource[In](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ In) extends SimpleSource[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + create(materializer, flowName).subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = + ActorPublisher[In](materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings), + name = s"$flowName-0-tick")) +} + diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala index 399f402cc9..a7306a2127 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2013 Typesafe Inc. + * Copyright (C) 2009-2014 Typesafe Inc. */ package akka.stream diff --git a/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala index 5f59df319f..6d5a747d09 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2013 Typesafe Inc. + * Copyright (C) 2009-2014 Typesafe Inc. */ package akka.stream diff --git a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala index a89b199edb..98044f7002 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2013 Typesafe Inc. + * Copyright (C) 2009-2014 Typesafe Inc. */ package akka.stream diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala new file mode 100644 index 0000000000..6829abd061 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } + +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } +import scala.util.control.NoStackTrace + +class FlowForeachSpec extends AkkaSpec { + + implicit val mat = FlowMaterializer() + import system.dispatcher + + "A Foreach" must { + + "call the procedure for each element" in { + val foreachSink = ForeachSink[Int](testActor ! _) + val mf = FlowFrom(1 to 3).withSink(foreachSink).run() + foreachSink.future(mf).onSuccess { + case _ ⇒ testActor ! "done" + } + expectMsg(1) + expectMsg(2) + expectMsg(3) + expectMsg("done") + } + + "complete the future for an empty stream" in { + val foreachSink = ForeachSink[Int](testActor ! _) + val mf = FlowFrom(Nil).withSink(foreachSink).run() + foreachSink.future(mf).onSuccess { + case _ ⇒ testActor ! "done" + } + expectMsg("done") + } + + "yield the first error" in { + val p = StreamTestKit.PublisherProbe[Int]() + val foreachSink = ForeachSink[Int](testActor ! _) + val mf = FlowFrom(p).withSink(foreachSink).run() + foreachSink.future(mf).onFailure { + case ex ⇒ testActor ! ex + } + val proc = p.expectSubscription + proc.expectRequest() + val ex = new RuntimeException("ex") with NoStackTrace + proc.sendError(ex) + expectMsg(ex) + } + + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowFromFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowFromFutureSpec.scala new file mode 100644 index 0000000000..89daaf48d2 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowFromFutureSpec.scala @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import scala.concurrent.{ Future, Promise } +import scala.concurrent.duration._ +import akka.stream.MaterializerSettings +import scala.util.control.NoStackTrace + +class FlowFromFutureSpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + + implicit val materializer = FlowMaterializer(settings) + + "A Flow based on a Future" must { + "produce one element from already successful Future" in { + val p = FlowFrom(Future.successful(1)).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + c.expectNoMsg(100.millis) + sub.request(1) + c.expectNext(1) + c.expectComplete() + } + + "produce error from already failed Future" in { + val ex = new RuntimeException("test") with NoStackTrace + val p = FlowFrom(Future.failed[Int](ex)).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + c.expectError(ex) + } + + "produce one element when Future is completed" in { + val promise = Promise[Int]() + val p = FlowFrom(promise.future).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(1) + c.expectNoMsg(100.millis) + promise.success(1) + c.expectNext(1) + c.expectComplete() + c.expectNoMsg(100.millis) + } + + "produce one element when Future is completed but not before request" in { + val promise = Promise[Int]() + val p = FlowFrom(promise.future).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + promise.success(1) + c.expectNoMsg(200.millis) + sub.request(1) + c.expectNext(1) + c.expectComplete() + } + + "produce elements with multiple subscribers" in { + val promise = Promise[Int]() + val p = FlowFrom(promise.future).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c1) + p.subscribe(c2) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.request(1) + promise.success(1) + sub2.request(2) + c1.expectNext(1) + c2.expectNext(1) + c1.expectComplete() + c2.expectComplete() + } + + "produce elements to later subscriber" in { + val promise = Promise[Int]() + val p = FlowFrom(promise.future).toPublisher() + val keepAlive = StreamTestKit.SubscriberProbe[Int]() + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(keepAlive) + p.subscribe(c1) + + val sub1 = c1.expectSubscription() + sub1.request(1) + promise.success(1) + c1.expectNext(1) + c1.expectComplete() + p.subscribe(c2) + val sub2 = c2.expectSubscription() + sub2.request(1) + c2.expectNext(1) + c2.expectComplete() + } + + "allow cancel before receiving element" in { + val promise = Promise[Int]() + val p = FlowFrom(promise.future).toPublisher() + val keepAlive = StreamTestKit.SubscriberProbe[Int]() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(keepAlive) + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(1) + sub.cancel() + c.expectNoMsg(500.millis) + promise.success(1) + c.expectNoMsg(200.millis) + } + } +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala new file mode 100644 index 0000000000..11281b9ebd --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.testkit.StreamTestKit.{ OnComplete, OnError, OnNext } +import scala.concurrent.duration._ +import akka.stream.MaterializerSettings + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowIterableSpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 512) + + implicit val materializer = FlowMaterializer(settings) + + "A Flow based on an iterable" must { + "produce elements" in { + val p = FlowFrom(List(1, 2, 3)).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(1) + c.expectNext(1) + c.expectNoMsg(100.millis) + sub.request(2) + c.expectNext(2) + c.expectNext(3) + c.expectComplete() + } + + "complete empty" in { + val p = FlowFrom(List.empty[Int]).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + c.expectComplete() + c.expectNoMsg(100.millis) + + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c2) + c2.expectComplete() + } + + "produce elements with multiple subscribers" in { + val p = FlowFrom(List(1, 2, 3)).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c1) + p.subscribe(c2) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.request(1) + sub2.request(2) + c1.expectNext(1) + c2.expectNext(1) + c2.expectNext(2) + c1.expectNoMsg(100.millis) + c2.expectNoMsg(100.millis) + sub1.request(2) + sub2.request(2) + c1.expectNext(2) + c1.expectNext(3) + c2.expectNext(3) + c1.expectComplete() + c2.expectComplete() + } + + "produce elements to later subscriber" in { + val p = FlowFrom(List(1, 2, 3)).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c1) + + val sub1 = c1.expectSubscription() + sub1.request(1) + c1.expectNext(1) + c1.expectNoMsg(100.millis) + p.subscribe(c2) + val sub2 = c2.expectSubscription() + sub2.request(2) + // starting from first element, new iterator per subscriber + c2.expectNext(1) + c2.expectNext(2) + c2.expectNoMsg(100.millis) + sub2.request(1) + c2.expectNext(3) + c2.expectComplete() + sub1.request(2) + c1.expectNext(2) + c1.expectNext(3) + c1.expectComplete() + } + + "produce elements with one transformation step" in { + val p = FlowFrom(List(1, 2, 3)).map(_ * 2).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(10) + c.expectNext(2) + c.expectNext(4) + c.expectNext(6) + c.expectComplete() + } + + "produce elements with two transformation steps" ignore { + // val p = FlowFrom(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toPublisher() + // val c = StreamTestKit.SubscriberProbe[Int]() + // p.subscribe(c) + // val sub = c.expectSubscription() + // sub.request(10) + // c.expectNext(4) + // c.expectNext(8) + // c.expectComplete() + } + + "allow cancel before receiving all elements" in { + val count = 100000 + val p = FlowFrom(1 to count).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(count) + c.expectNext(1) + sub.cancel() + val got = c.probe.receiveWhile(3.seconds) { + case _: OnNext[_] ⇒ + case OnComplete ⇒ fail("Cancel expected before OnComplete") + case OnError(e) ⇒ fail(e) + } + got.size should be < (count - 1) + } + + "have value equality of publisher" in { + val p1 = FlowFrom(List(1, 2, 3)).toPublisher() + val p2 = FlowFrom(List(1, 2, 3)).toPublisher() + p1 should be(p2) + p2 should be(p1) + val p3 = FlowFrom(List(1, 2, 3, 4)).toPublisher() + p1 should not be (p3) + p3 should not be (p1) + val p4 = FlowFrom(Vector.empty[String]).toPublisher() + val p5 = FlowFrom(Set.empty[String]).toPublisher() + p1 should not be (p4) + p4 should be(p5) + p5 should be(p4) + val p6 = FlowFrom(List(1, 2, 3).iterator).toPublisher() + p1 should not be (p6) + p6 should not be (p1) + } + } +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala new file mode 100644 index 0000000000..6db382b711 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala @@ -0,0 +1,139 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.duration._ +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit.OnNext +import akka.stream.testkit.StreamTestKit.OnComplete +import akka.stream.testkit.StreamTestKit.OnError +import akka.stream.MaterializerSettings + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowIteratorSpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 4, maxSize = 4) + + implicit val materializer = FlowMaterializer(settings) + + "A Flow based on an iterator" must { + "produce elements" in { + val p = FlowFrom(List(1, 2, 3).iterator).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(1) + c.expectNext(1) + c.expectNoMsg(100.millis) + sub.request(3) + c.expectNext(2) + c.expectNext(3) + c.expectComplete() + } + + "complete empty" in { + val p = FlowFrom(List.empty[Int].iterator).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + c.expectComplete() + c.expectNoMsg(100.millis) + + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c2) + c2.expectComplete() + } + + "produce elements with multiple subscribers" in { + val p = FlowFrom(List(1, 2, 3).iterator).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c1) + p.subscribe(c2) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.request(1) + sub2.request(2) + c1.expectNext(1) + c2.expectNext(1) + c2.expectNext(2) + c1.expectNoMsg(100.millis) + c2.expectNoMsg(100.millis) + sub1.request(2) + sub2.request(2) + c1.expectNext(2) + c1.expectNext(3) + c2.expectNext(3) + c1.expectComplete() + c2.expectComplete() + } + + "produce elements to later subscriber" in { + val p = FlowFrom(List(1, 2, 3).iterator).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c1) + + val sub1 = c1.expectSubscription() + sub1.request(1) + c1.expectNext(1) + c1.expectNoMsg(100.millis) + p.subscribe(c2) + val sub2 = c2.expectSubscription() + sub2.request(3) + // element 1 is already gone + c2.expectNext(2) + c2.expectNext(3) + c2.expectComplete() + sub1.request(3) + c1.expectNext(2) + c1.expectNext(3) + c1.expectComplete() + } + + "produce elements with one transformation step" in { + val p = FlowFrom(List(1, 2, 3).iterator).map(_ * 2).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(10) + c.expectNext(2) + c.expectNext(4) + c.expectNext(6) + c.expectComplete() + } + + // FIXME enable test when filter is implemented + "produce elements with two transformation steps" ignore { + // val p = FlowFrom(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toPublisher() + // val c = StreamTestKit.SubscriberProbe[Int]() + // p.subscribe(c) + // val sub = c.expectSubscription() + // sub.request(10) + // c.expectNext(4) + // c.expectNext(8) + // c.expectComplete() + } + + "allow cancel before receiving all elements" in { + val count = 100000 + val p = FlowFrom((1 to count).iterator).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(count) + c.expectNext(1) + sub.cancel() + val got = c.probe.receiveWhile(3.seconds) { + case _: OnNext[_] ⇒ + case OnComplete ⇒ fail("Cancel expected before OnComplete") + case OnError(e) ⇒ fail(e) + } + got.size should be < (count - 1) + } + + } +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowOnCompleteSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowOnCompleteSpec.scala new file mode 100644 index 0000000000..7528d40cc7 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowOnCompleteSpec.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.{ AkkaSpec, ScriptedTest, StreamTestKit } +import akka.testkit.TestProbe +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } +import scala.util.control.NoStackTrace +import scala.util.{ Failure, Success } +import akka.stream.MaterializerSettings + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + implicit val materializer = FlowMaterializer(settings) + + "A Flow with onComplete" must { + + "invoke callback on normal completion" in { + val onCompleteProbe = TestProbe() + val p = StreamTestKit.PublisherProbe[Int]() + FlowFrom(p).withSink(OnCompleteSink(onCompleteProbe.ref ! _)).run() + val proc = p.expectSubscription + proc.expectRequest() + proc.sendNext(42) + onCompleteProbe.expectNoMsg(100.millis) + proc.sendComplete() + onCompleteProbe.expectMsg(Success(())) + } + + "yield the first error" in { + val onCompleteProbe = TestProbe() + val p = StreamTestKit.PublisherProbe[Int]() + FlowFrom(p).withSink(OnCompleteSink(onCompleteProbe.ref ! _)).run() + val proc = p.expectSubscription + proc.expectRequest() + val ex = new RuntimeException("ex") with NoStackTrace + proc.sendError(ex) + onCompleteProbe.expectMsg(Failure(ex)) + onCompleteProbe.expectNoMsg(100.millis) + } + + "invoke callback for an empty stream" in { + val onCompleteProbe = TestProbe() + val p = StreamTestKit.PublisherProbe[Int]() + FlowFrom(p).withSink(OnCompleteSink(onCompleteProbe.ref ! _)).run() + val proc = p.expectSubscription + proc.expectRequest() + proc.sendComplete() + onCompleteProbe.expectMsg(Success(())) + onCompleteProbe.expectNoMsg(100.millis) + } + + "invoke callback after transform and foreach steps " in { + val onCompleteProbe = TestProbe() + val p = StreamTestKit.PublisherProbe[Int]() + import system.dispatcher // for the Future.onComplete + val foreachSink = ForeachSink[Int] { + x ⇒ onCompleteProbe.ref ! ("foreach-" + x) + } + val mf = FlowFrom(p).map { x ⇒ + onCompleteProbe.ref ! ("map-" + x) + x + }.withSink(foreachSink).run() + foreachSink.future(mf) onComplete { onCompleteProbe.ref ! _ } + val proc = p.expectSubscription + proc.expectRequest() + proc.sendNext(42) + proc.sendComplete() + onCompleteProbe.expectMsg("map-42") + onCompleteProbe.expectMsg("foreach-42") + onCompleteProbe.expectMsg(Success(())) + } + + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowPublishToSubscriberSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowPublishToSubscriberSpec.scala new file mode 100644 index 0000000000..45c4e5fe12 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowPublishToSubscriberSpec.scala @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import akka.stream.MaterializerSettings + +class FlowPublishToSubscriberSpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + implicit val materializer = FlowMaterializer(settings) + + "A Flow with SubscriberSink" must { + + "publish elements to the subscriber" in { + val c = StreamTestKit.SubscriberProbe[Int]() + FlowFrom(List(1, 2, 3)).withSink(SubscriberSink(c)).run() + val s = c.expectSubscription() + s.request(3) + c.expectNext(1) + c.expectNext(2) + c.expectNext(3) + c.expectComplete() + } + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala index ec261a4a6a..093e8adc7a 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala @@ -11,30 +11,30 @@ import scala.concurrent.Future class FlowSpec extends AkkaSpec { - val intSeq = IterableIn(Seq(1, 2, 3)) - val strSeq = IterableIn(Seq("a", "b", "c")) + val intSeq = IterableSource(Seq(1, 2, 3)) + val strSeq = IterableSource(Seq("a", "b", "c")) import scala.concurrent.ExecutionContext.Implicits.global - val intFut = FutureIn(Future { 3 }) + val intFut = FutureSource(Future { 3 }) implicit val materializer = FlowMaterializer(MaterializerSettings(system)) "ProcessorFlow" should { "go through all states" in { val f: ProcessorFlow[Int, Int] = FlowFrom[Int] - .withInput(intSeq) - .withOutput(PublisherOut()) - .withoutInput - .withoutOutput + .withSource(intSeq) + .withSink(PublisherSink[Int]) + .withoutSource + .withoutSink } "should not run" in { val open: ProcessorFlow[Int, Int] = FlowFrom[Int] "open.run()" shouldNot compile } - "accept IterableIn" in { - val f: PublisherFlow[Int, Int] = FlowFrom[Int].withInput(intSeq) + "accept IterableSource" in { + val f: FlowWithSource[Int, Int] = FlowFrom[Int].withSource(intSeq) } - "accept FutureIn" in { - val f: PublisherFlow[Int, Int] = FlowFrom[Int].withInput(intFut) + "accept FutureSource" in { + val f: FlowWithSource[Int, Int] = FlowFrom[Int].withSource(intFut) } "append ProcessorFlow" in { val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) @@ -42,14 +42,14 @@ class FlowSpec extends AkkaSpec { val open3: ProcessorFlow[Int, Int] = open1.append(open2) "open3.run()" shouldNot compile - val closedInput: PublisherFlow[Int, Int] = open3.withInput(intSeq) - "closedInput.run()" shouldNot compile + val closedSource: FlowWithSource[Int, Int] = open3.withSource(intSeq) + "closedSource.run()" shouldNot compile - val closedOutput: SubscriberFlow[Int, Int] = open3.withOutput(PublisherOut()) - "closedOutput.run()" shouldNot compile + val closedSink: FlowWithSink[Int, Int] = open3.withSink(PublisherSink[Int]) + "closedSink.run()" shouldNot compile - closedInput.withOutput(PublisherOut()).run() - closedOutput.withInput(intSeq).run() + closedSource.withSink(PublisherSink[Int]).run() + closedSink.withSource(intSeq).run() } "prepend ProcessorFlow" in { val open1: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) @@ -57,89 +57,89 @@ class FlowSpec extends AkkaSpec { val open3: ProcessorFlow[String, String] = open1.prepend(open2) "open3.run()" shouldNot compile - val closedInput: PublisherFlow[String, String] = open3.withInput(strSeq) - "closedInput.run()" shouldNot compile + val closedSource: FlowWithSource[String, String] = open3.withSource(strSeq) + "closedSource.run()" shouldNot compile - val closedOutput: SubscriberFlow[String, String] = open3.withOutput(PublisherOut()) - "closedOutput.run()" shouldNot compile + val closedSink: FlowWithSink[String, String] = open3.withSink(PublisherSink[String]) + "closedSink.run()" shouldNot compile - closedInput.withOutput(PublisherOut()).run - closedOutput.withInput(strSeq).run + closedSource.withSink(PublisherSink[String]).run + closedSink.withSource(strSeq).run } - "append SubscriberFlow" in { + "append FlowWithSink" in { val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) - val closedOutput: SubscriberFlow[String, Int] = FlowFrom[String].map(_.hashCode).withOutput(PublisherOut()) - val appended: SubscriberFlow[Int, Int] = open.append(closedOutput) + val closedSink: FlowWithSink[String, Int] = FlowFrom[String].map(_.hashCode).withSink(PublisherSink[Int]) + val appended: FlowWithSink[Int, Int] = open.append(closedSink) "appended.run()" shouldNot compile "appended.toFuture" shouldNot compile - appended.withInput(intSeq).run + appended.withSource(intSeq).run } - "prepend PublisherFlow" in { + "prepend FlowWithSource" in { val open: ProcessorFlow[Int, String] = FlowFrom[Int].map(_.toString) - val closedInput: PublisherFlow[String, Int] = FlowFrom[String].map(_.hashCode).withInput(strSeq) - val prepended: PublisherFlow[String, String] = open.prepend(closedInput) + val closedSource: FlowWithSource[String, Int] = FlowFrom[String].map(_.hashCode).withSource(strSeq) + val prepended: FlowWithSource[String, String] = open.prepend(closedSource) "prepended.run()" shouldNot compile - "prepended.withInput(strSeq)" shouldNot compile - prepended.withOutput(PublisherOut()).run + "prepended.withSource(strSeq)" shouldNot compile + prepended.withSink(PublisherSink[String]).run } } - "SubscriberFlow" should { - val openInput: SubscriberFlow[Int, String] = - FlowFrom[Int].map(_.toString).withOutput(PublisherOut()) - "accept Input" in { - openInput.withInput(intSeq) + "FlowWithSink" should { + val openSource: FlowWithSink[Int, String] = + FlowFrom[Int].map(_.toString).withSink(PublisherSink[String]) + "accept Source" in { + openSource.withSource(intSeq) } - "drop Output" in { - openInput.withoutOutput + "drop Sink" in { + openSource.withoutSink } - "not drop Input" in { - "openInput.withoutInput" shouldNot compile + "not drop Source" in { + "openSource.withoutSource" shouldNot compile } - "not accept Output" in { - "openInput.ToFuture" shouldNot compile + "not accept Sink" in { + "openSource.ToFuture" shouldNot compile } "not run()" in { - "openInput.run()" shouldNot compile + "openSource.run()" shouldNot compile } } - "PublisherFlow" should { - val openOutput: PublisherFlow[Int, String] = + "FlowWithSource" should { + val openSink: FlowWithSource[Int, String] = FlowFrom(Seq(1, 2, 3)).map(_.toString) - "accept Output" in { - openOutput.withOutput(PublisherOut()) + "accept Sink" in { + openSink.withSink(PublisherSink[String]) } - "drop Input" in { - openOutput.withoutInput + "drop Source" in { + openSink.withoutSource } - "not drop Output" in { - "openOutput.withoutOutput" shouldNot compile + "not drop Sink" in { + "openSink.withoutSink" shouldNot compile } - "not accept Input" in { - "openOutput.withInput(intSeq)" shouldNot compile + "not accept Source" in { + "openSink.withSource(intSeq)" shouldNot compile } "not run()" in { - "openOutput.run()" shouldNot compile + "openSink.run()" shouldNot compile } } "RunnableFlow" should { val closed: RunnableFlow[Int, String] = - FlowFrom(Seq(1, 2, 3)).map(_.toString).withOutput(PublisherOut()) + FlowFrom(Seq(1, 2, 3)).map(_.toString).withSink(PublisherSink[String]) "run" in { closed.run() } - "drop Input" in { - closed.withoutInput + "drop Source" in { + closed.withoutSource } - "drop Output" in { - closed.withoutOutput + "drop Sink" in { + closed.withoutSink } - "not accept Input" in { - "closed.withInput(intSeq)" shouldNot compile + "not accept Source" in { + "closed.withSource(intSeq)" shouldNot compile } - "not accept Output" in { + "not accept Sink" in { "closed.ToFuture" shouldNot compile } } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala new file mode 100644 index 0000000000..e9da74ef59 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.{ AkkaSpec, ScriptedTest, StreamTestKit } +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } +import scala.util.Failure +import akka.stream.MaterializerSettings + +class FlowToFutureSpec extends AkkaSpec with ScriptedTest { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + implicit val materializer = FlowMaterializer(settings) + + "A Flow with toFuture" must { + + "yield the first value" in { + val p = StreamTestKit.PublisherProbe[Int]() + val f = FutureSink[Int] + val m = FlowFrom(p).withSink(f).run() + val proc = p.expectSubscription + proc.expectRequest() + proc.sendNext(42) + Await.result(f.future(m), 100.millis) should be(42) + proc.expectCancellation() + } + + "yield the first value when actively constructing" in { + val p = StreamTestKit.PublisherProbe[Int]() + val f = FutureSink[Int] + val s = SubscriberSource[Int] + val m = FlowFrom[Int].withSource(s).withSink(f).run() + p.subscribe(s.subscriber(m)) + val proc = p.expectSubscription + proc.expectRequest() + proc.sendNext(42) + Await.result(f.future(m), 100.millis) should be(42) + proc.expectCancellation() + } + + "yield the first error" in { + val p = StreamTestKit.PublisherProbe[Int]() + val f = FutureSink[Int] + val m = FlowFrom(p).withSink(f).run() + val proc = p.expectSubscription + proc.expectRequest() + val ex = new RuntimeException("ex") + proc.sendError(ex) + val future = f.future(m) + Await.ready(future, 100.millis) + future.value.get should be(Failure(ex)) + } + + "yield NoSuchElementExcption for empty stream" in { + val p = StreamTestKit.PublisherProbe[Int]() + val f = FutureSink[Int] + val m = FlowFrom(p).withSink(f).run() + val proc = p.expectSubscription + proc.expectRequest() + proc.sendComplete() + val future = f.future(m) + Await.ready(future, 100.millis) + future.value.get match { + case Failure(e: NoSuchElementException) ⇒ e.getMessage() should be("empty stream") + case x ⇒ fail("expected NoSuchElementException, got " + x) + } + } + + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala index fb25ddb227..a182038769 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala @@ -23,7 +23,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "A Flow with transform operations" must { "produce one-to-one transformation as expected" in { - val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List(1, 2, 3)).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -32,7 +32,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d List(tot) } }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -46,7 +46,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "produce one-to-several transformation as expected" in { - val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List(1, 2, 3)).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -55,7 +55,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d Vector.fill(elem)(tot) } }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -72,7 +72,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "produce dropping transformation as expected" in { - val p = FlowFrom(List(1, 2, 3, 4)).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List(1, 2, 3, 4)).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -85,7 +85,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } } }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -99,7 +99,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "produce multi-step transformation as expected" in { - val p = FlowFrom(List("a", "bc", "def")).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List("a", "bc", "def")).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[String, Int] { var concat = "" @@ -115,7 +115,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d List(tot) } }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val c1 = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c1) val sub1 = c1.expectSubscription() @@ -138,7 +138,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "invoke onComplete when done" in { - val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List("a")).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[String, String] { var s = "" @@ -148,7 +148,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } override def onTermination(e: Option[Throwable]) = List(s + "B") }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val c = StreamTestKit.SubscriberProbe[String]() p2.subscribe(c) val s = c.expectSubscription() @@ -159,7 +159,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "invoke cleanup when done" in { val cleanupProbe = TestProbe() - val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List("a")).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[String, String] { var s = "" @@ -170,7 +170,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d override def onTermination(e: Option[Throwable]) = List(s + "B") override def cleanup() = cleanupProbe.ref ! s }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val c = StreamTestKit.SubscriberProbe[String]() p2.subscribe(c) val s = c.expectSubscription() @@ -182,7 +182,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "invoke cleanup when done consume" in { val cleanupProbe = TestProbe() - val p = FlowFrom(List("a")).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List("a")).toPublisher() FlowFrom(p). transform("transform", () ⇒ new Transformer[String, String] { var s = "x" @@ -192,13 +192,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } override def cleanup() = cleanupProbe.ref ! s }). - withOutput(BlackholeOut()).run() + withSink(BlackholeSink).run() cleanupProbe.expectMsg("a") } "invoke cleanup when done after error" in { val cleanupProbe = TestProbe() - val p = FlowFrom(List("a", "b", "c")).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List("a", "b", "c")).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[String, String] { var s = "" @@ -214,7 +214,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d override def onTermination(e: Option[Throwable]) = List(s + "B") override def cleanup() = cleanupProbe.ref ! s }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val c = StreamTestKit.SubscriberProbe[String]() p2.subscribe(c) val s = c.expectSubscription() @@ -236,7 +236,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } override def isComplete = s == "1" }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val proc = p.expectSubscription val c = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c) @@ -263,7 +263,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d override def onTermination(e: Option[Throwable]) = List(s.length + 10) override def cleanup() = cleanupProbe.ref ! s }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val proc = p.expectSubscription val c = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c) @@ -279,7 +279,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "report error when exception is thrown" in { - val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List(1, 2, 3)).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = { @@ -290,7 +290,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } } }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -304,12 +304,12 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "support cancel as expected" in { - val p = FlowFrom(List(1, 2, 3)).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List(1, 2, 3)).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = List(elem, elem) }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -323,13 +323,13 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "support producing elements from empty inputs" in { - val p = FlowFrom(List.empty[Int]).withOutput(PublisherOut()).toPublisher() + val p = FlowFrom(List.empty[Int]).toPublisher() val p2 = FlowFrom(p). transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = Nil override def onTermination(e: Option[Throwable]) = List(1, 2, 3) }). - withOutput(PublisherOut()).toPublisher() + toPublisher() val subscriber = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(subscriber) val subscription = subscriber.expectSubscription() @@ -363,7 +363,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d case _ ⇒ Nil } } - }).withOutput(PublisherOut()).produceTo(subscriber) + }).publishTo(subscriber) val subscription = subscriber.expectSubscription() subscription.request(10) @@ -383,16 +383,16 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d count += 1 List(count) } - }).withOutput(PublisherOut()) + }) val s1 = StreamTestKit.SubscriberProbe[Int]() - flow.produceTo(s1) + flow.publishTo(s1) s1.expectSubscription().request(3) s1.expectNext(1, 2, 3) s1.expectComplete() val s2 = StreamTestKit.SubscriberProbe[Int]() - flow.produceTo(s2) + flow.publishTo(s2) s2.expectSubscription().request(3) s2.expectNext(1, 2, 3) s2.expectComplete() diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/TickPublisherSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/TickPublisherSpec.scala new file mode 100644 index 0000000000..a7fbe39f94 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/TickPublisherSpec.scala @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.duration._ +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import scala.util.control.NoStackTrace + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class TickPublisherSpec extends AkkaSpec { + + implicit val materializer = FlowMaterializer() + + "A Flow based on tick publisher" must { + "produce ticks" in { + val tickGen = Iterator from 1 + val c = StreamTestKit.SubscriberProbe[String]() + FlowFrom(1.second, 500.millis, () ⇒ "tick-" + tickGen.next()).publishTo(c) + val sub = c.expectSubscription() + sub.request(3) + c.expectNoMsg(600.millis) + c.expectNext("tick-1") + c.expectNoMsg(200.millis) + c.expectNext("tick-2") + c.expectNoMsg(200.millis) + c.expectNext("tick-3") + sub.cancel() + c.expectNoMsg(200.millis) + } + + "drop ticks when not requested" in { + val tickGen = Iterator from 1 + val c = StreamTestKit.SubscriberProbe[String]() + FlowFrom(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).publishTo(c) + val sub = c.expectSubscription() + sub.request(2) + c.expectNext("tick-1") + c.expectNoMsg(200.millis) + c.expectNext("tick-2") + c.expectNoMsg(1400.millis) + sub.request(2) + c.expectNext("tick-4") + c.expectNoMsg(200.millis) + c.expectNext("tick-5") + sub.cancel() + c.expectNoMsg(200.millis) + } + + "produce ticks with multiple subscribers" in { + val tickGen = Iterator from 1 + val p = FlowFrom(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[String]() + val c2 = StreamTestKit.SubscriberProbe[String]() + p.subscribe(c1) + p.subscribe(c2) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.request(1) + sub2.request(2) + c1.expectNext("tick-1") + c2.expectNext("tick-1") + c2.expectNoMsg(200.millis) + c2.expectNext("tick-2") + c1.expectNoMsg(200.millis) + sub1.request(2) + sub2.request(2) + c1.expectNext("tick-3") + c2.expectNext("tick-3") + sub1.cancel() + sub2.cancel() + } + + "signal onError when tick closure throws" in { + val c = StreamTestKit.SubscriberProbe[String]() + FlowFrom(1.second, 1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace).publishTo(c) + val sub = c.expectSubscription() + sub.request(3) + c.expectError.getMessage should be("tick err") + } + + // FIXME enable this test again when zip is back + "be usable with zip for a simple form of rate limiting" ignore { + // val c = StreamTestKit.SubscriberProbe[Int]() + // val rate = FlowFrom(1.second, 1.second, () ⇒ "tick").toPublisher() + // FlowFrom(1 to 100).zip(rate).map { case (n, _) ⇒ n }.publishTo(c) + // val sub = c.expectSubscription() + // sub.request(1000) + // c.expectNext(1) + // c.expectNoMsg(200.millis) + // c.expectNext(2) + // c.expectNoMsg(200.millis) + // sub.cancel() + } + + } +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala index 29afcd49e7..156cc2a3e4 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -25,7 +25,7 @@ object StreamTestKit { */ def errorPublisher[T](cause: Throwable): Publisher[T] = ErrorPublisher(cause: Throwable).asInstanceOf[Publisher[T]] - def emptyPublisher[T](): Publisher[T] = EmptyPublisher.asInstanceOf[Publisher[T]] + def emptyPublisher[T](): Publisher[T] = EmptyPublisher[T] /** * Subscribes the subscriber and signals error after the first request.