diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index e1e320af25..083c0744f0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -17,6 +17,7 @@ import akka.stream.scaladsl2.FlowMaterializer import akka.stream.MaterializerSettings import akka.stream.impl.ActorPublisher import akka.stream.impl.IterablePublisher +import akka.stream.impl.IteratorPublisher import akka.stream.impl.TransformProcessorImpl import akka.stream.impl.ActorProcessor import akka.stream.impl.ExposedPublisher @@ -25,6 +26,15 @@ import akka.stream.scaladsl2.Sink import akka.stream.scaladsl2.MaterializedFlow import akka.stream.scaladsl2.IterableSource import akka.stream.impl.EmptyPublisher +import akka.stream.scaladsl2.IteratorSource +import akka.stream.scaladsl2.PublisherSource +import akka.stream.scaladsl2.ThunkSource +import akka.stream.impl.SimpleCallbackPublisher +import akka.stream.scaladsl2.FutureSource +import akka.stream.impl.FuturePublisher +import akka.stream.impl.ErrorPublisher +import akka.stream.impl.TickPublisher +import akka.stream.scaladsl2.TickSource /** * INTERNAL API @@ -69,19 +79,36 @@ private[akka] case class ActorBasedFlowMaterializer( // Ops come in reverse order override def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow = { val flowName = createFlowName() - val (s, p) = - if (ops.isEmpty) { - val identityProcessor: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]] - (identityProcessor, identityProcessor) - } else { - val opsSize = ops.size - val outProcessor = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[In, Out]] - val topSubscriber = processorChain(outProcessor, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Out]] - (topSubscriber, outProcessor) - } - val sourceValue = source.attach(s, this, flowName) - val sinkValue = sink.attach(p, this) - new MaterializedFlow(source, sourceValue, sink, sinkValue) + + // FIXME specialcasing, otherwise some tests fail in FlowIterableSpec due to the injected identityProcessor: + // - "have value equality of publisher" + // - "produce elements to later subscriber" + def specialCase: PartialFunction[Source[In], Publisher[Out]] = { + case PublisherSource(p) ⇒ p.asInstanceOf[Publisher[Out]] + case src: IterableSource[In] ⇒ materializeSource(src, flowName).asInstanceOf[Publisher[Out]] + case src: IteratorSource[In] ⇒ materializeSource(src, flowName).asInstanceOf[Publisher[Out]] + case src: TickSource[In] ⇒ materializeSource(src, flowName).asInstanceOf[Publisher[Out]] + } + + if (ops.isEmpty && specialCase.isDefinedAt(source)) { + val p = specialCase(source) + val sinkValue = sink.attach(p, this) + new MaterializedFlow(source, None, sink, sinkValue) + } else { + val (s, p) = + if (ops.isEmpty) { + val identityProcessor: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]] + (identityProcessor, identityProcessor) + } else { + val opsSize = ops.size + val outProcessor = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[In, Out]] + val topSubscriber = processorChain(outProcessor, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Out]] + (topSubscriber, outProcessor) + } + val sourceValue = source.attach(s, this, flowName) + val sinkValue = sink.attach(p, this) + new MaterializedFlow(source, sourceValue, sink, sinkValue) + } } @@ -99,6 +126,35 @@ private[akka] case class ActorBasedFlowMaterializer( name = s"$flowName-0-iterable"), Some(source.iterable)) } + override def materializeSource[In](source: IteratorSource[In], flowName: String): Publisher[In] = { + if (source.iterator.isEmpty) EmptyPublisher[In] + else ActorPublisher[In](actorOf(IteratorPublisher.props(source.iterator, settings), + name = s"$flowName-0-iterator")) + } + + override def materializeSource[In](source: ThunkSource[In], flowName: String): Publisher[In] = { + ActorPublisher[In](actorOf(SimpleCallbackPublisher.props(settings, source.f), + name = s"$flowName-0-thunk")) + } + + override def materializeSource[In](source: FutureSource[In], flowName: String): Publisher[In] = { + source.future.value match { + case Some(Success(element)) ⇒ + ActorPublisher[In](actorOf(IterablePublisher.props(List(element), settings), + name = s"$flowName-0-future"), Some(source.future)) + case Some(Failure(t)) ⇒ + ErrorPublisher(t).asInstanceOf[Publisher[In]] + case None ⇒ + ActorPublisher[In](actorOf(FuturePublisher.props(source.future, settings), + name = s"$flowName-0-future"), Some(source.future)) + } + } + + override def materializeSource[In](source: TickSource[In], flowName: String): Publisher[In] = { + ActorPublisher[In](actorOf(TickPublisher.props(source.initialDelay, source.interval, source.tick, settings), + name = s"$flowName-0-tick")) + } + private def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { val impl = actorOf(ActorProcessorFactory.props(settings, op), s"$flowName-$n-${op.name}") ActorProcessorFactory(impl) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index ccd0f07e85..0f20021df0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -16,6 +16,7 @@ import akka.stream.impl.EmptyPublisher import akka.stream.impl.IterablePublisher import akka.stream.impl2.ActorBasedFlowMaterializer import org.reactivestreams._ +import scala.concurrent.duration.FiniteDuration sealed trait Flow @@ -27,15 +28,63 @@ object FlowFrom { def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil) /** - * Helper to create `Flow` with [[Source]] from `Iterable`. - * Example usage: `FlowFrom(Seq(1,2,3))` + * Helper to create `Flow` with [[Source]] from `Publisher`. + * + * Construct a transformation starting with given publisher. The transformation steps + * are executed by a series of [[org.reactivestreams.Processor]] instances + * that mediate the flow of elements downstream and the propagation of + * back-pressure upstream. */ - def apply[T](i: immutable.Iterable[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IterableSource(i)) + def apply[T](publisher: Publisher[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(PublisherSource(publisher)) /** - * Helper to create `Flow` with [[Source]] from `Publisher`. + * Helper to create `Flow` with [[Source]] from `Iterator`. + * Example usage: `FlowFrom(Seq(1,2,3).iterator)` + * + * Start a new `Flow` from the given Iterator. The produced stream of elements + * will continue until the iterator runs empty or fails during evaluation of + * the `next()` method. Elements are pulled out of the iterator + * in accordance with the demand coming from the downstream transformation + * steps. */ - def apply[T](p: Publisher[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(PublisherSource(p)) + def apply[T](iterator: Iterator[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IteratorSource(iterator)) + + /** + * Helper to create `Flow` with [[Source]] from `Iterable`. + * Example usage: `FlowFrom(Seq(1,2,3))` + * + * Starts a new `Flow` from the given `Iterable`. This is like starting from an + * Iterator, but every Subscriber directly attached to the Publisher of this + * stream will see an individual flow of elements (always starting from the + * beginning) regardless of when they subscribed. + */ + def apply[T](iterable: immutable.Iterable[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IterableSource(iterable)) + + /** + * Define the sequence of elements to be produced by the given closure. + * The stream ends normally when evaluation of the `Callable` returns a `None`. + * The stream ends exceptionally when an exception is thrown from the `Callable`. + */ + def apply[T](f: () ⇒ Option[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(ThunkSource(f)) + + /** + * Start a new `Flow` from the given `Future`. The stream will consist of + * one element when the `Future` is completed with a successful value, which + * may happen before or after materializing the `Flow`. + * The stream terminates with an error if the `Future` is completed with a failure. + */ + def apply[T](future: Future[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(FutureSource(future)) + + /** + * Elements are produced from the tick closure periodically with the specified interval. + * The tick element will be delivered to downstream consumers that has requested any elements. + * If a consumer has not requested any elements at the point in time when the tick + * element is produced it will not receive that tick element later. It will + * receive new tick elements as soon as it has requested more elements. + */ + def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): FlowWithSource[T, T] = + FlowFrom[T].withSource(TickSource(initialDelay, interval, tick)) + } trait Source[+In] { @@ -57,8 +106,7 @@ final case class SubscriberSource[In]() extends SourceKey[In, Subscriber[In]] { override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): Subscriber[In] = flowSubscriber - def subscriber(m: MaterializedSource): Subscriber[In] = - m.getSourceFor(this) + def subscriber(m: MaterializedSource): Subscriber[In] = m.getSourceFor(this) } /** @@ -67,7 +115,18 @@ final case class SubscriberSource[In]() extends SourceKey[In, Subscriber[In]] { final case class PublisherSource[In](p: Publisher[In]) extends Source[In] { override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = { p.subscribe(flowSubscriber) - p + None + } +} + +/** + * [[Source]] from `Iterator` + */ +final case class IteratorSource[In](iterator: Iterator[In]) extends Source[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = { + val p: Publisher[In] = materializer.materializeSource(this, flowName) + p.subscribe(flowSubscriber) + None } } @@ -78,15 +137,41 @@ final case class IterableSource[In](iterable: immutable.Iterable[In]) extends So override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = { val p: Publisher[In] = materializer.materializeSource(this, flowName) p.subscribe(flowSubscriber) - iterable + None + } +} + +/** + * [[Source]] from closure + */ +final case class ThunkSource[In](f: () ⇒ Option[In]) extends Source[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = { + val p: Publisher[In] = materializer.materializeSource(this, flowName) + p.subscribe(flowSubscriber) + None + } +} + +/** + * [[Source]] from closure + */ +final case class TickSource[In](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ In) extends Source[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = { + val p: Publisher[In] = materializer.materializeSource(this, flowName) + p.subscribe(flowSubscriber) + None } } /** * [[Source]] from `Future` */ -final case class FutureSource[In](f: Future[In]) extends Source[In] { - override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = ??? +final case class FutureSource[In](future: Future[In]) extends Source[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = { + val p: Publisher[In] = materializer.materializeSource(this, flowName) + p.subscribe(flowSubscriber) + None + } } trait Sink[-Out] { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala index 0e94c8512c..c519405997 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala @@ -3,19 +3,17 @@ */ package akka.stream.scaladsl2 -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ +import org.reactivestreams.Publisher +import akka.actor.ActorContext import akka.actor.ActorRefFactory +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.stream.MaterializerSettings import akka.stream.impl2.ActorBasedFlowMaterializer import akka.stream.impl2.Ast -import org.reactivestreams.{ Publisher, Subscriber } -import scala.concurrent.duration._ -import akka.actor.Deploy -import akka.actor.ExtendedActorSystem -import akka.actor.ActorContext -import akka.stream.impl2.StreamSupervisor import akka.stream.impl2.FlowNameCounter -import akka.stream.MaterializerSettings -import org.reactivestreams.Processor +import akka.stream.impl2.StreamSupervisor object FlowMaterializer { @@ -25,26 +23,68 @@ object FlowMaterializer { * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * will be used to create one actor that in turn creates actors for the transformation steps. * + * The materializer's [[akka.stream.MaterializerSettings]] will be obtained from the + * configuration of the `context`'s underlying [[akka.actor.ActorSystem]]. + * * The `namePrefix` is used as the first part of the names of the actors running * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of * `namePrefix-flowNumber-flowStepNumber-stepName`. */ - def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = { - val system = context match { - case s: ExtendedActorSystem ⇒ s - case c: ActorContext ⇒ c.system - case null ⇒ throw new IllegalArgumentException("ActorRefFactory context must be defined") - case _ ⇒ throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, " + - "got [${_contex.getClass.getName}]") - } + def apply(materializerSettings: Option[MaterializerSettings] = None, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = { + val system = actorSystemOf(context) + + val settings = materializerSettings getOrElse MaterializerSettings(system) + apply(settings, namePrefix.getOrElse("flow"))(context) + } + + /** + * Scala API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def apply(materializerSettings: MaterializerSettings, namePrefix: String)(implicit context: ActorRefFactory): FlowMaterializer = { + val system = actorSystemOf(context) new ActorBasedFlowMaterializer( - settings, - context.actorOf(StreamSupervisor.props(settings).withDispatcher(settings.dispatcher)), + materializerSettings, + context.actorOf(StreamSupervisor.props(materializerSettings).withDispatcher(materializerSettings.dispatcher)), FlowNameCounter(system).counter, - namePrefix.getOrElse("flow")) + namePrefix) } + /** + * Scala API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def apply(materializerSettings: MaterializerSettings)(implicit context: ActorRefFactory): FlowMaterializer = + apply(Some(materializerSettings), None) + + /** + * Java API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * Defaults the actor name prefix used to name actors running the processing steps to `"flow"`. + * The actor names are built up of `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def create(context: ActorRefFactory): FlowMaterializer = + apply()(context) + /** * Java API: Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] @@ -52,7 +92,33 @@ object FlowMaterializer { * will be used to create one actor that in turn creates actors for the transformation steps. */ def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer = - apply(settings)(context) + apply(Option(settings), None)(context) + + /** + * Java API: Creates a FlowMaterializer which will execute every step of a transformation + * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] + * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) + * will be used to create these actors, therefore it is *forbidden* to pass this object + * to another actor if the factory is an ActorContext. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. + */ + def create(settings: MaterializerSettings, context: ActorRefFactory, namePrefix: String): FlowMaterializer = + apply(Option(settings), Option(namePrefix))(context) + + private def actorSystemOf(context: ActorRefFactory): ActorSystem = { + val system = context match { + case s: ExtendedActorSystem ⇒ s + case c: ActorContext ⇒ c.system + case null ⇒ throw new IllegalArgumentException("ActorRefFactory context must be defined") + case _ ⇒ + throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, got [${context.getClass.getName}]") + } + system + } + } /** @@ -78,5 +144,13 @@ abstract class FlowMaterializer(val settings: MaterializerSettings) { def materializeSource[In](source: IterableSource[In], flowName: String): Publisher[In] + def materializeSource[In](source: IteratorSource[In], flowName: String): Publisher[In] + + def materializeSource[In](source: ThunkSource[In], flowName: String): Publisher[In] + + def materializeSource[In](source: FutureSource[In], flowName: String): Publisher[In] + + def materializeSource[In](source: TickSource[In], flowName: String): Publisher[In] + } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowFromFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowFromFutureSpec.scala new file mode 100644 index 0000000000..89daaf48d2 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowFromFutureSpec.scala @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import scala.concurrent.{ Future, Promise } +import scala.concurrent.duration._ +import akka.stream.MaterializerSettings +import scala.util.control.NoStackTrace + +class FlowFromFutureSpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + + implicit val materializer = FlowMaterializer(settings) + + "A Flow based on a Future" must { + "produce one element from already successful Future" in { + val p = FlowFrom(Future.successful(1)).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + c.expectNoMsg(100.millis) + sub.request(1) + c.expectNext(1) + c.expectComplete() + } + + "produce error from already failed Future" in { + val ex = new RuntimeException("test") with NoStackTrace + val p = FlowFrom(Future.failed[Int](ex)).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + c.expectError(ex) + } + + "produce one element when Future is completed" in { + val promise = Promise[Int]() + val p = FlowFrom(promise.future).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(1) + c.expectNoMsg(100.millis) + promise.success(1) + c.expectNext(1) + c.expectComplete() + c.expectNoMsg(100.millis) + } + + "produce one element when Future is completed but not before request" in { + val promise = Promise[Int]() + val p = FlowFrom(promise.future).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + promise.success(1) + c.expectNoMsg(200.millis) + sub.request(1) + c.expectNext(1) + c.expectComplete() + } + + "produce elements with multiple subscribers" in { + val promise = Promise[Int]() + val p = FlowFrom(promise.future).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c1) + p.subscribe(c2) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.request(1) + promise.success(1) + sub2.request(2) + c1.expectNext(1) + c2.expectNext(1) + c1.expectComplete() + c2.expectComplete() + } + + "produce elements to later subscriber" in { + val promise = Promise[Int]() + val p = FlowFrom(promise.future).toPublisher() + val keepAlive = StreamTestKit.SubscriberProbe[Int]() + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(keepAlive) + p.subscribe(c1) + + val sub1 = c1.expectSubscription() + sub1.request(1) + promise.success(1) + c1.expectNext(1) + c1.expectComplete() + p.subscribe(c2) + val sub2 = c2.expectSubscription() + sub2.request(1) + c2.expectNext(1) + c2.expectComplete() + } + + "allow cancel before receiving element" in { + val promise = Promise[Int]() + val p = FlowFrom(promise.future).toPublisher() + val keepAlive = StreamTestKit.SubscriberProbe[Int]() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(keepAlive) + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(1) + sub.cancel() + c.expectNoMsg(500.millis) + promise.success(1) + c.expectNoMsg(200.millis) + } + } +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala new file mode 100644 index 0000000000..11281b9ebd --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowIterableSpec.scala @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.testkit.StreamTestKit.{ OnComplete, OnError, OnNext } +import scala.concurrent.duration._ +import akka.stream.MaterializerSettings + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowIterableSpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 512) + + implicit val materializer = FlowMaterializer(settings) + + "A Flow based on an iterable" must { + "produce elements" in { + val p = FlowFrom(List(1, 2, 3)).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(1) + c.expectNext(1) + c.expectNoMsg(100.millis) + sub.request(2) + c.expectNext(2) + c.expectNext(3) + c.expectComplete() + } + + "complete empty" in { + val p = FlowFrom(List.empty[Int]).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + c.expectComplete() + c.expectNoMsg(100.millis) + + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c2) + c2.expectComplete() + } + + "produce elements with multiple subscribers" in { + val p = FlowFrom(List(1, 2, 3)).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c1) + p.subscribe(c2) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.request(1) + sub2.request(2) + c1.expectNext(1) + c2.expectNext(1) + c2.expectNext(2) + c1.expectNoMsg(100.millis) + c2.expectNoMsg(100.millis) + sub1.request(2) + sub2.request(2) + c1.expectNext(2) + c1.expectNext(3) + c2.expectNext(3) + c1.expectComplete() + c2.expectComplete() + } + + "produce elements to later subscriber" in { + val p = FlowFrom(List(1, 2, 3)).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c1) + + val sub1 = c1.expectSubscription() + sub1.request(1) + c1.expectNext(1) + c1.expectNoMsg(100.millis) + p.subscribe(c2) + val sub2 = c2.expectSubscription() + sub2.request(2) + // starting from first element, new iterator per subscriber + c2.expectNext(1) + c2.expectNext(2) + c2.expectNoMsg(100.millis) + sub2.request(1) + c2.expectNext(3) + c2.expectComplete() + sub1.request(2) + c1.expectNext(2) + c1.expectNext(3) + c1.expectComplete() + } + + "produce elements with one transformation step" in { + val p = FlowFrom(List(1, 2, 3)).map(_ * 2).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(10) + c.expectNext(2) + c.expectNext(4) + c.expectNext(6) + c.expectComplete() + } + + "produce elements with two transformation steps" ignore { + // val p = FlowFrom(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toPublisher() + // val c = StreamTestKit.SubscriberProbe[Int]() + // p.subscribe(c) + // val sub = c.expectSubscription() + // sub.request(10) + // c.expectNext(4) + // c.expectNext(8) + // c.expectComplete() + } + + "allow cancel before receiving all elements" in { + val count = 100000 + val p = FlowFrom(1 to count).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(count) + c.expectNext(1) + sub.cancel() + val got = c.probe.receiveWhile(3.seconds) { + case _: OnNext[_] ⇒ + case OnComplete ⇒ fail("Cancel expected before OnComplete") + case OnError(e) ⇒ fail(e) + } + got.size should be < (count - 1) + } + + "have value equality of publisher" in { + val p1 = FlowFrom(List(1, 2, 3)).toPublisher() + val p2 = FlowFrom(List(1, 2, 3)).toPublisher() + p1 should be(p2) + p2 should be(p1) + val p3 = FlowFrom(List(1, 2, 3, 4)).toPublisher() + p1 should not be (p3) + p3 should not be (p1) + val p4 = FlowFrom(Vector.empty[String]).toPublisher() + val p5 = FlowFrom(Set.empty[String]).toPublisher() + p1 should not be (p4) + p4 should be(p5) + p5 should be(p4) + val p6 = FlowFrom(List(1, 2, 3).iterator).toPublisher() + p1 should not be (p6) + p6 should not be (p1) + } + } +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala new file mode 100644 index 0000000000..6db382b711 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowIteratorSpec.scala @@ -0,0 +1,139 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.duration._ +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit.OnNext +import akka.stream.testkit.StreamTestKit.OnComplete +import akka.stream.testkit.StreamTestKit.OnError +import akka.stream.MaterializerSettings + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowIteratorSpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 4, maxSize = 4) + + implicit val materializer = FlowMaterializer(settings) + + "A Flow based on an iterator" must { + "produce elements" in { + val p = FlowFrom(List(1, 2, 3).iterator).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(1) + c.expectNext(1) + c.expectNoMsg(100.millis) + sub.request(3) + c.expectNext(2) + c.expectNext(3) + c.expectComplete() + } + + "complete empty" in { + val p = FlowFrom(List.empty[Int].iterator).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + c.expectComplete() + c.expectNoMsg(100.millis) + + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c2) + c2.expectComplete() + } + + "produce elements with multiple subscribers" in { + val p = FlowFrom(List(1, 2, 3).iterator).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c1) + p.subscribe(c2) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.request(1) + sub2.request(2) + c1.expectNext(1) + c2.expectNext(1) + c2.expectNext(2) + c1.expectNoMsg(100.millis) + c2.expectNoMsg(100.millis) + sub1.request(2) + sub2.request(2) + c1.expectNext(2) + c1.expectNext(3) + c2.expectNext(3) + c1.expectComplete() + c2.expectComplete() + } + + "produce elements to later subscriber" in { + val p = FlowFrom(List(1, 2, 3).iterator).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c1) + + val sub1 = c1.expectSubscription() + sub1.request(1) + c1.expectNext(1) + c1.expectNoMsg(100.millis) + p.subscribe(c2) + val sub2 = c2.expectSubscription() + sub2.request(3) + // element 1 is already gone + c2.expectNext(2) + c2.expectNext(3) + c2.expectComplete() + sub1.request(3) + c1.expectNext(2) + c1.expectNext(3) + c1.expectComplete() + } + + "produce elements with one transformation step" in { + val p = FlowFrom(List(1, 2, 3).iterator).map(_ * 2).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(10) + c.expectNext(2) + c.expectNext(4) + c.expectNext(6) + c.expectComplete() + } + + // FIXME enable test when filter is implemented + "produce elements with two transformation steps" ignore { + // val p = FlowFrom(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toPublisher() + // val c = StreamTestKit.SubscriberProbe[Int]() + // p.subscribe(c) + // val sub = c.expectSubscription() + // sub.request(10) + // c.expectNext(4) + // c.expectNext(8) + // c.expectComplete() + } + + "allow cancel before receiving all elements" in { + val count = 100000 + val p = FlowFrom((1 to count).iterator).toPublisher() + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(count) + c.expectNext(1) + sub.cancel() + val got = c.probe.receiveWhile(3.seconds) { + case _: OnNext[_] ⇒ + case OnComplete ⇒ fail("Cancel expected before OnComplete") + case OnError(e) ⇒ fail(e) + } + got.size should be < (count - 1) + } + + } +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/TickPublisherSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/TickPublisherSpec.scala new file mode 100644 index 0000000000..a7fbe39f94 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/TickPublisherSpec.scala @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.duration._ +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import scala.util.control.NoStackTrace + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class TickPublisherSpec extends AkkaSpec { + + implicit val materializer = FlowMaterializer() + + "A Flow based on tick publisher" must { + "produce ticks" in { + val tickGen = Iterator from 1 + val c = StreamTestKit.SubscriberProbe[String]() + FlowFrom(1.second, 500.millis, () ⇒ "tick-" + tickGen.next()).publishTo(c) + val sub = c.expectSubscription() + sub.request(3) + c.expectNoMsg(600.millis) + c.expectNext("tick-1") + c.expectNoMsg(200.millis) + c.expectNext("tick-2") + c.expectNoMsg(200.millis) + c.expectNext("tick-3") + sub.cancel() + c.expectNoMsg(200.millis) + } + + "drop ticks when not requested" in { + val tickGen = Iterator from 1 + val c = StreamTestKit.SubscriberProbe[String]() + FlowFrom(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).publishTo(c) + val sub = c.expectSubscription() + sub.request(2) + c.expectNext("tick-1") + c.expectNoMsg(200.millis) + c.expectNext("tick-2") + c.expectNoMsg(1400.millis) + sub.request(2) + c.expectNext("tick-4") + c.expectNoMsg(200.millis) + c.expectNext("tick-5") + sub.cancel() + c.expectNoMsg(200.millis) + } + + "produce ticks with multiple subscribers" in { + val tickGen = Iterator from 1 + val p = FlowFrom(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).toPublisher() + val c1 = StreamTestKit.SubscriberProbe[String]() + val c2 = StreamTestKit.SubscriberProbe[String]() + p.subscribe(c1) + p.subscribe(c2) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.request(1) + sub2.request(2) + c1.expectNext("tick-1") + c2.expectNext("tick-1") + c2.expectNoMsg(200.millis) + c2.expectNext("tick-2") + c1.expectNoMsg(200.millis) + sub1.request(2) + sub2.request(2) + c1.expectNext("tick-3") + c2.expectNext("tick-3") + sub1.cancel() + sub2.cancel() + } + + "signal onError when tick closure throws" in { + val c = StreamTestKit.SubscriberProbe[String]() + FlowFrom(1.second, 1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace).publishTo(c) + val sub = c.expectSubscription() + sub.request(3) + c.expectError.getMessage should be("tick err") + } + + // FIXME enable this test again when zip is back + "be usable with zip for a simple form of rate limiting" ignore { + // val c = StreamTestKit.SubscriberProbe[Int]() + // val rate = FlowFrom(1.second, 1.second, () ⇒ "tick").toPublisher() + // FlowFrom(1 to 100).zip(rate).map { case (n, _) ⇒ n }.publishTo(c) + // val sub = c.expectSubscription() + // sub.request(1000) + // c.expectNext(1) + // c.expectNoMsg(200.millis) + // c.expectNext(2) + // c.expectNoMsg(200.millis) + // sub.cancel() + } + + } +} \ No newline at end of file