diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index 083c0744f0..5264fc1ca5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -4,37 +4,18 @@ package akka.stream.impl2 import java.util.concurrent.atomic.AtomicLong -import akka.actor.{ Actor, ActorCell, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, LocalActorRef, Props, RepointableActorRef } -import akka.pattern.ask -import org.reactivestreams.{ Processor, Publisher, Subscriber } + import scala.annotation.tailrec import scala.collection.immutable -import scala.concurrent.{ Await, Future } -import scala.concurrent.duration._ -import scala.util.{ Failure, Success } -import akka.stream.Transformer -import akka.stream.scaladsl2.FlowMaterializer -import akka.stream.MaterializerSettings -import akka.stream.impl.ActorPublisher -import akka.stream.impl.IterablePublisher -import akka.stream.impl.IteratorPublisher -import akka.stream.impl.TransformProcessorImpl -import akka.stream.impl.ActorProcessor -import akka.stream.impl.ExposedPublisher -import akka.stream.scaladsl2.Source -import akka.stream.scaladsl2.Sink -import akka.stream.scaladsl2.MaterializedFlow -import akka.stream.scaladsl2.IterableSource -import akka.stream.impl.EmptyPublisher -import akka.stream.scaladsl2.IteratorSource -import akka.stream.scaladsl2.PublisherSource -import akka.stream.scaladsl2.ThunkSource -import akka.stream.impl.SimpleCallbackPublisher -import akka.stream.scaladsl2.FutureSource -import akka.stream.impl.FuturePublisher -import akka.stream.impl.ErrorPublisher -import akka.stream.impl.TickPublisher -import akka.stream.scaladsl2.TickSource +import scala.concurrent.Await + +import org.reactivestreams.{ Processor, Publisher, Subscriber } + +import akka.actor._ +import akka.pattern.ask +import akka.stream.{ MaterializerSettings, Transformer } +import akka.stream.impl.{ ActorProcessor, ActorPublisher, ExposedPublisher, TransformProcessorImpl } +import akka.stream.scaladsl2._ /** * INTERNAL API @@ -51,12 +32,12 @@ private[akka] object Ast { /** * INTERNAL API */ -private[akka] case class ActorBasedFlowMaterializer( - override val settings: MaterializerSettings, - supervisor: ActorRef, - flowNameCounter: AtomicLong, - namePrefix: String) +case class ActorBasedFlowMaterializer(override val settings: MaterializerSettings, + supervisor: ActorRef, + flowNameCounter: AtomicLong, + namePrefix: String) extends FlowMaterializer(settings) { + import akka.stream.impl2.Ast._ def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name) @@ -80,87 +61,67 @@ private[akka] case class ActorBasedFlowMaterializer( override def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow = { val flowName = createFlowName() - // FIXME specialcasing, otherwise some tests fail in FlowIterableSpec due to the injected identityProcessor: - // - "have value equality of publisher" - // - "produce elements to later subscriber" - def specialCase: PartialFunction[Source[In], Publisher[Out]] = { - case PublisherSource(p) ⇒ p.asInstanceOf[Publisher[Out]] - case src: IterableSource[In] ⇒ materializeSource(src, flowName).asInstanceOf[Publisher[Out]] - case src: IteratorSource[In] ⇒ materializeSource(src, flowName).asInstanceOf[Publisher[Out]] - case src: TickSource[In] ⇒ materializeSource(src, flowName).asInstanceOf[Publisher[Out]] + def attachSink(pub: Publisher[Out]) = sink match { + case s: SimpleSink[Out] ⇒ s.attach(pub, this, flowName) + case s: SinkWithKey[Out, _] ⇒ s.attach(pub, this, flowName) + case _ ⇒ throw new MaterializationException("unknown Sink type " + sink.getClass) + } + def attachSource(sub: Subscriber[In]) = source match { + case s: SimpleSource[In] ⇒ s.attach(sub, this, flowName) + case s: SourceWithKey[In, _] ⇒ s.attach(sub, this, flowName) + case _ ⇒ throw new MaterializationException("unknown Source type " + sink.getClass) + } + def createSink() = sink.asInstanceOf[Sink[In]] match { + case s: SimpleSink[In] ⇒ s.create(this, flowName) -> (()) + case s: SinkWithKey[In, _] ⇒ s.create(this, flowName) + case _ ⇒ throw new MaterializationException("unknown Sink type " + sink.getClass) + } + def createSource() = source.asInstanceOf[Source[Out]] match { + case s: SimpleSource[Out] ⇒ s.create(this, flowName) -> (()) + case s: SourceWithKey[Out, _] ⇒ s.create(this, flowName) + case _ ⇒ throw new MaterializationException("unknown Source type " + sink.getClass) + } + def isActive(s: AnyRef) = s match { + case source: SimpleSource[_] ⇒ source.isActive + case source: SourceWithKey[_, _] ⇒ source.isActive + case sink: SimpleSink[_] ⇒ sink.isActive + case sink: SinkWithKey[_, _] ⇒ sink.isActive + case _: Source[_] ⇒ throw new MaterializationException("unknown Source type " + sink.getClass) + case _: Sink[_] ⇒ throw new MaterializationException("unknown Sink type " + sink.getClass) } - if (ops.isEmpty && specialCase.isDefinedAt(source)) { - val p = specialCase(source) - val sinkValue = sink.attach(p, this) - new MaterializedFlow(source, None, sink, sinkValue) - } else { - val (s, p) = - if (ops.isEmpty) { - val identityProcessor: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]] - (identityProcessor, identityProcessor) + val (sourceValue, sinkValue) = + if (ops.isEmpty) { + if (isActive(sink)) { + val (sub, value) = createSink() + (attachSource(sub), value) + } else if (isActive(source)) { + val (pub, value) = createSource() + (value, attachSink(pub)) } else { - val opsSize = ops.size - val outProcessor = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[In, Out]] - val topSubscriber = processorChain(outProcessor, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Out]] - (topSubscriber, outProcessor) + val id: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]] + (attachSource(id), attachSink(id)) } - val sourceValue = source.attach(s, this, flowName) - val sinkValue = sink.attach(p, this) - new MaterializedFlow(source, sourceValue, sink, sinkValue) - } - + } else { + val opsSize = ops.size + val last = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[Any, Out]] + val first = processorChain(last, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Any]] + (attachSource(first), attachSink(last)) + } + new MaterializedFlow(source, sourceValue, sink, sinkValue) } - private def identityProcessor[I](flowName: String): Processor[I, I] = - processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[I, I]] - private val identityTransform = Transform("identity", () ⇒ new Transformer[Any, Any] { override def onNext(element: Any) = List(element) }) - override def materializeSource[In](source: IterableSource[In], flowName: String): Publisher[In] = { - if (source.iterable.isEmpty) EmptyPublisher[In] - else ActorPublisher(actorOf(IterablePublisher.props(source.iterable, settings), - name = s"$flowName-0-iterable"), Some(source.iterable)) - } - - override def materializeSource[In](source: IteratorSource[In], flowName: String): Publisher[In] = { - if (source.iterator.isEmpty) EmptyPublisher[In] - else ActorPublisher[In](actorOf(IteratorPublisher.props(source.iterator, settings), - name = s"$flowName-0-iterator")) - } - - override def materializeSource[In](source: ThunkSource[In], flowName: String): Publisher[In] = { - ActorPublisher[In](actorOf(SimpleCallbackPublisher.props(settings, source.f), - name = s"$flowName-0-thunk")) - } - - override def materializeSource[In](source: FutureSource[In], flowName: String): Publisher[In] = { - source.future.value match { - case Some(Success(element)) ⇒ - ActorPublisher[In](actorOf(IterablePublisher.props(List(element), settings), - name = s"$flowName-0-future"), Some(source.future)) - case Some(Failure(t)) ⇒ - ErrorPublisher(t).asInstanceOf[Publisher[In]] - case None ⇒ - ActorPublisher[In](actorOf(FuturePublisher.props(source.future, settings), - name = s"$flowName-0-future"), Some(source.future)) - } - } - - override def materializeSource[In](source: TickSource[In], flowName: String): Publisher[In] = { - ActorPublisher[In](actorOf(TickPublisher.props(source.initialDelay, source.interval, source.tick, settings), - name = s"$flowName-0-tick")) - } - private def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { val impl = actorOf(ActorProcessorFactory.props(settings, op), s"$flowName-$n-${op.name}") ActorProcessorFactory(impl) } - private def actorOf(props: Props, name: String): ActorRef = supervisor match { + def actorOf(props: Props, name: String): ActorRef = supervisor match { case ref: LocalActorRef ⇒ ref.underlying.attachChild(props, name, systemService = false) case ref: RepointableActorRef ⇒ @@ -228,4 +189,4 @@ private[akka] object ActorProcessorFactory { impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]]) p } -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index 013beb101d..23bdd9167c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -21,312 +21,13 @@ import scala.util.Try import scala.util.Failure import scala.util.Success +/** + * This is the interface from which all concrete Flows inherit. No generic + * operations are presented because the concrete type of Flow (i.e. whether + * it has a [[Source]] or a [[Sink]]) determines what is available. + */ sealed trait Flow -object FlowFrom { - /** - * Helper to create `Flow` without [[Source]]. - * Example usage: `FlowFrom[Int]` - */ - def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil) - - /** - * Helper to create `Flow` with [[Source]] from `Publisher`. - * - * Construct a transformation starting with given publisher. The transformation steps - * are executed by a series of [[org.reactivestreams.Processor]] instances - * that mediate the flow of elements downstream and the propagation of - * back-pressure upstream. - */ - def apply[T](publisher: Publisher[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(PublisherSource(publisher)) - - /** - * Helper to create `Flow` with [[Source]] from `Iterator`. - * Example usage: `FlowFrom(Seq(1,2,3).iterator)` - * - * Start a new `Flow` from the given Iterator. The produced stream of elements - * will continue until the iterator runs empty or fails during evaluation of - * the `next()` method. Elements are pulled out of the iterator - * in accordance with the demand coming from the downstream transformation - * steps. - */ - def apply[T](iterator: Iterator[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IteratorSource(iterator)) - - /** - * Helper to create `Flow` with [[Source]] from `Iterable`. - * Example usage: `FlowFrom(Seq(1,2,3))` - * - * Starts a new `Flow` from the given `Iterable`. This is like starting from an - * Iterator, but every Subscriber directly attached to the Publisher of this - * stream will see an individual flow of elements (always starting from the - * beginning) regardless of when they subscribed. - */ - def apply[T](iterable: immutable.Iterable[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IterableSource(iterable)) - - /** - * Define the sequence of elements to be produced by the given closure. - * The stream ends normally when evaluation of the closure returns a `None`. - * The stream ends exceptionally when an exception is thrown from the closure. - */ - def apply[T](f: () ⇒ Option[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(ThunkSource(f)) - - /** - * Start a new `Flow` from the given `Future`. The stream will consist of - * one element when the `Future` is completed with a successful value, which - * may happen before or after materializing the `Flow`. - * The stream terminates with an error if the `Future` is completed with a failure. - */ - def apply[T](future: Future[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(FutureSource(future)) - - /** - * Elements are produced from the tick closure periodically with the specified interval. - * The tick element will be delivered to downstream consumers that has requested any elements. - * If a consumer has not requested any elements at the point in time when the tick - * element is produced it will not receive that tick element later. It will - * receive new tick elements as soon as it has requested more elements. - */ - def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): FlowWithSource[T, T] = - FlowFrom[T].withSource(TickSource(initialDelay, interval, tick)) - -} - -trait Source[+In] { - def attach(flowSubscriber: Subscriber[In] @uncheckedVariance, materializer: FlowMaterializer, flowName: String): Any -} - -trait SourceKey[+In, T] extends Source[In] { - override def attach(flowSubscriber: Subscriber[In] @uncheckedVariance, materializer: FlowMaterializer, flowName: String): T - // these are unique keys, case class equality would break them - final override def equals(other: Any): Boolean = super.equals(other) - final override def hashCode: Int = super.hashCode -} - -/** - * Holds a `Subscriber` representing the input side of the flow. - * The `Subscriber` can later be connected to an upstream `Publisher`. - */ -final case class SubscriberSource[In]() extends SourceKey[In, Subscriber[In]] { - override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): Subscriber[In] = - flowSubscriber - - def subscriber(m: MaterializedSource): Subscriber[In] = m.getSourceFor(this) -} - -/** - * Construct a transformation starting with given publisher. The transformation steps - * are executed by a series of [[org.reactivestreams.Processor]] instances - * that mediate the flow of elements downstream and the propagation of - * back-pressure upstream. - */ -final case class PublisherSource[In](p: Publisher[In]) extends Source[In] { - override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = { - p.subscribe(flowSubscriber) - None - } -} - -/** - * Start a new `Flow` from the given Iterator. The produced stream of elements - * will continue until the iterator runs empty or fails during evaluation of - * the `next()` method. Elements are pulled out of the iterator - * in accordance with the demand coming from the downstream transformation - * steps. - */ -final case class IteratorSource[In](iterator: Iterator[In]) extends Source[In] { - override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = { - val p: Publisher[In] = materializer.materializeSource(this, flowName) - p.subscribe(flowSubscriber) - None - } -} - -/** - * Starts a new `Flow` from the given `Iterable`. This is like starting from an - * Iterator, but every Subscriber directly attached to the Publisher of this - * stream will see an individual flow of elements (always starting from the - * beginning) regardless of when they subscribed. - */ -final case class IterableSource[In](iterable: immutable.Iterable[In]) extends Source[In] { - override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = { - val p: Publisher[In] = materializer.materializeSource(this, flowName) - p.subscribe(flowSubscriber) - None - } -} - -/** - * Define the sequence of elements to be produced by the given closure. - * The stream ends normally when evaluation of the closure returns a `None`. - * The stream ends exceptionally when an exception is thrown from the closure. - */ -final case class ThunkSource[In](f: () ⇒ Option[In]) extends Source[In] { - override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = { - val p: Publisher[In] = materializer.materializeSource(this, flowName) - p.subscribe(flowSubscriber) - None - } -} - -/** - * Start a new `Flow` from the given `Future`. The stream will consist of - * one element when the `Future` is completed with a successful value, which - * may happen before or after materializing the `Flow`. - * The stream terminates with an error if the `Future` is completed with a failure. - */ -final case class FutureSource[In](future: Future[In]) extends Source[In] { - override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = { - val p: Publisher[In] = materializer.materializeSource(this, flowName) - p.subscribe(flowSubscriber) - None - } -} - -/** - * Elements are produced from the tick closure periodically with the specified interval. - * The tick element will be delivered to downstream consumers that has requested any elements. - * If a consumer has not requested any elements at the point in time when the tick - * element is produced it will not receive that tick element later. It will - * receive new tick elements as soon as it has requested more elements. - */ -final case class TickSource[In](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ In) extends Source[In] { - override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = { - val p: Publisher[In] = materializer.materializeSource(this, flowName) - p.subscribe(flowSubscriber) - None - } -} - -trait Sink[-Out] { - def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: FlowMaterializer): Any -} - -trait SinkKey[-Out, T] extends Sink[Out] { - override def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: FlowMaterializer): T - // these are unique keys, case class equality would break them - final override def equals(other: Any): Boolean = super.equals(other) - final override def hashCode: Int = super.hashCode -} - -/** - * Holds the downstream-most [[org.reactivestreams.Publisher]] interface of the materialized flow. - * The stream will not have any subscribers attached at this point, which means that after prefetching - * elements to fill the internal buffers it will assert back-pressure until - * a subscriber connects and creates demand for elements to be emitted. - */ -object PublisherSink { - private val instance = new PublisherSink[Nothing] - def apply[T]: PublisherSink[T] = instance.asInstanceOf[PublisherSink[T]] -} - -class PublisherSink[Out]() extends SinkKey[Out, Publisher[Out]] { - def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): Publisher[Out] = flowPublisher - def publisher(m: MaterializedSink): Publisher[Out] = m.getSinkFor(this) - - override def toString: String = "FutureSink" -} - -/** - * Holds a [[scala.concurrent.Future]] that will be fulfilled with the first - * thing that is signaled to this stream, which can be either an element (after - * which the upstream subscription is canceled), an error condition (putting - * the Future into the corresponding failed state) or the end-of-stream - * (failing the Future with a NoSuchElementException). - */ -object FutureSink { - private val instance = new FutureSink[Nothing] - def apply[T]: FutureSink[T] = instance.asInstanceOf[FutureSink[T]] -} - -class FutureSink[Out] extends SinkKey[Out, Future[Out]] { - def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): Future[Out] = { - val p = Promise[Out]() - FlowFrom(flowPublisher).transform("futureSink", () ⇒ new Transformer[Out, Unit] { - var done = false - override def onNext(in: Out) = { p success in; done = true; Nil } - override def onError(e: Throwable) = { p failure e } - override def isComplete = done - override def onTermination(e: Option[Throwable]) = { p.tryFailure(new NoSuchElementException("empty stream")); Nil } - }).consume()(materializer) - p.future - } - - def future(m: MaterializedSink): Future[Out] = m.getSinkFor(this) - - override def toString: String = "FutureSink" -} - -/** - * Attaches a subscriber to this stream which will just discard all received - * elements. - */ -final case object BlackholeSink extends Sink[Any] { - override def attach(flowPublisher: Publisher[Any], materializer: FlowMaterializer): AnyRef = { - val s = new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize) - flowPublisher.subscribe(s) - None - } -} - -/** - * Attaches a subscriber to this stream. - */ -final case class SubscriberSink[Out](subscriber: Subscriber[Out]) extends Sink[Out] { - override def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): AnyRef = { - flowPublisher.subscribe(subscriber) - None - } -} - -object OnCompleteSink { - private val SuccessUnit = Success[Unit](()) -} - -/** - * When the flow is completed, either through an error or normal - * completion, apply the provided function with [[scala.util.Success]] - * or [[scala.util.Failure]]. - */ -final case class OnCompleteSink[Out](callback: Try[Unit] ⇒ Unit) extends Sink[Out] { - override def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): AnyRef = { - FlowFrom(flowPublisher).transform("onCompleteSink", () ⇒ new Transformer[Out, Unit] { - override def onNext(in: Out) = Nil - override def onError(e: Throwable) = { - callback(Failure(e)) - throw e - } - override def onTermination(e: Option[Throwable]) = { - callback(OnCompleteSink.SuccessUnit) - Nil - } - }).consume()(materializer) - None - } -} - -/** - * Invoke the given procedure for each received element. The sink holds a [[scala.concurrent.Future]] - * that will be completed with `Success` when reaching the normal end of the stream, or completed - * with `Failure` if there is an error is signaled in the stream. - */ -final case class ForeachSink[Out](f: Out ⇒ Unit) extends SinkKey[Out, Future[Unit]] { - override def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): Future[Unit] = { - val promise = Promise[Unit]() - FlowFrom(flowPublisher).transform("foreach", () ⇒ new Transformer[Out, Unit] { - override def onNext(in: Out) = { f(in); Nil } - override def onError(cause: Throwable): Unit = () - override def onTermination(e: Option[Throwable]) = { - e match { - case None ⇒ promise.success(()) - case Some(e) ⇒ promise.failure(e) - } - Nil - } - }).consume()(materializer) - promise.future - } - def future(m: MaterializedSink): Future[Unit] = m.getSinkFor(this) -} - /** * Marker interface for flows that have a free (attachable) input side. */ @@ -379,11 +80,11 @@ final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends FlowOps[In */ final case class FlowWithSink[-In, +Out](private[scaladsl2] val output: Sink[Out @uncheckedVariance], ops: List[AstNode]) extends HasNoSource[In] { - def withSource(in: Source[In]): RunnableFlow[In, Out] = new RunnableFlow(in, output, ops) + def withSource(in: Source[In]): RunnableFlow[In, Out] = RunnableFlow(in, output, ops) def withoutSink: ProcessorFlow[In, Out] = ProcessorFlow(ops) def prepend[T](f: ProcessorFlow[T, In]): FlowWithSink[T, Out] = FlowWithSink(output, ops ::: f.ops) - def prepend[T](f: FlowWithSource[T, In]): RunnableFlow[T, Out] = new RunnableFlow(f.input, output, ops ::: f.ops) + def prepend[T](f: FlowWithSource[T, In]): RunnableFlow[T, Out] = RunnableFlow(f.input, output, ops ::: f.ops) def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] = { val subIn = SubscriberSource[In]() @@ -400,11 +101,11 @@ final case class FlowWithSource[-In, +Out](private[scaladsl2] val input: Source[ override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) - def withSink(out: Sink[Out]): RunnableFlow[In, Out] = new RunnableFlow(input, out, ops) + def withSink(out: Sink[Out]): RunnableFlow[In, Out] = RunnableFlow(input, out, ops) def withoutSource: ProcessorFlow[In, Out] = ProcessorFlow(ops) def append[T](f: ProcessorFlow[Out, T]): FlowWithSource[In, T] = FlowWithSource(input, f.ops ++: ops) - def append[T](f: FlowWithSink[Out, T]): RunnableFlow[In, T] = new RunnableFlow(input, f.output, f.ops ++: ops) + def append[T](f: FlowWithSink[Out, T]): RunnableFlow[In, T] = RunnableFlow(input, f.output, f.ops ++: ops) def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = { val pubOut = PublisherSink[Out] @@ -433,19 +134,19 @@ final case class RunnableFlow[-In, +Out](private[scaladsl2] val input: Source[In } class MaterializedFlow(sourceKey: AnyRef, matSource: Any, sinkKey: AnyRef, matSink: Any) extends MaterializedSource with MaterializedSink { - override def getSourceFor[T](key: SourceKey[_, T]): T = + override def getSourceFor[T](key: SourceWithKey[_, T]): T = if (key == sourceKey) matSource.asInstanceOf[T] else throw new IllegalArgumentException(s"Source key [$key] doesn't match the source [$sourceKey] of this flow") - def getSinkFor[T](key: SinkKey[_, T]): T = + def getSinkFor[T](key: SinkWithKey[_, T]): T = if (key == sinkKey) matSink.asInstanceOf[T] else throw new IllegalArgumentException(s"Sink key [$key] doesn't match the sink [$sinkKey] of this flow") } trait MaterializedSource { - def getSourceFor[T](sourceKey: SourceKey[_, T]): T + def getSourceFor[T](sourceKey: SourceWithKey[_, T]): T } trait MaterializedSink { - def getSinkFor[T](sinkKey: SinkKey[_, T]): T + def getSinkFor[T](sinkKey: SinkWithKey[_, T]): T } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala index c519405997..f444f0ed94 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala @@ -3,17 +3,9 @@ */ package akka.stream.scaladsl2 -import scala.concurrent.duration._ -import org.reactivestreams.Publisher -import akka.actor.ActorContext -import akka.actor.ActorRefFactory -import akka.actor.ActorSystem -import akka.actor.ExtendedActorSystem +import akka.actor.{ ActorContext, ActorRefFactory, ActorSystem, ExtendedActorSystem } import akka.stream.MaterializerSettings -import akka.stream.impl2.ActorBasedFlowMaterializer -import akka.stream.impl2.Ast -import akka.stream.impl2.FlowNameCounter -import akka.stream.impl2.StreamSupervisor +import akka.stream.impl2.{ ActorBasedFlowMaterializer, Ast, FlowNameCounter, StreamSupervisor } object FlowMaterializer { @@ -131,26 +123,25 @@ object FlowMaterializer { abstract class FlowMaterializer(val settings: MaterializerSettings) { /** - * The `namePrefix` is used as the first part of the names of the actors running - * the processing steps. + * The `namePrefix` shall be used for deriving the names of processing + * entities that are created during materialization. This is meant to aid + * logging and error reporting both during materialization and while the + * stream is running. */ def withNamePrefix(name: String): FlowMaterializer /** - * INTERNAL API - * ops are stored in reverse order + * This method interprets the given Flow description and creates the running + * stream. The result can be highly implementation specific, ranging from + * local actor chains to remote-deployed processing networks. */ - private[akka] def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow - - def materializeSource[In](source: IterableSource[In], flowName: String): Publisher[In] - - def materializeSource[In](source: IteratorSource[In], flowName: String): Publisher[In] - - def materializeSource[In](source: ThunkSource[In], flowName: String): Publisher[In] - - def materializeSource[In](source: FutureSource[In], flowName: String): Publisher[In] - - def materializeSource[In](source: TickSource[In], flowName: String): Publisher[In] + def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow } +/** + * This exception or subtypes thereof should be used to signal materialization + * failures. + */ +class MaterializationException(msg: String, cause: Throwable = null) extends RuntimeException(msg, cause) + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala new file mode 100644 index 0000000000..81ac1cf31f --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala @@ -0,0 +1,218 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.annotation.unchecked.uncheckedVariance +import scala.concurrent.{ Future, Promise } +import scala.util.{ Failure, Success, Try } +import org.reactivestreams.{ Publisher, Subscriber, Subscription } +import akka.stream.Transformer +import akka.stream.impl.BlackholeSubscriber +import akka.stream.impl2.ActorBasedFlowMaterializer +import java.util.concurrent.atomic.AtomicReference + +/** + * This trait is a marker for a pluggable stream sink. Concrete instances should + * implement [[SinkWithKey]] or [[SimpleSink]], otherwise a custom [[FlowMaterializer]] + * will have to be used to be able to attach them. + * + * All Sinks defined in this package rely upon an [[ActorBasedFlowMaterializer]] being + * made available to them in order to use the attach method. Other + * FlowMaterializers can be used but must then implement the functionality of these + * Sink nodes themselves (or construct an ActorBasedFlowMaterializer). + */ +trait Sink[-Out] + +/** + * A sink that does not need to create a user-accessible object during materialization. + */ +trait SimpleSink[-Out] extends Sink[Out] { + /** + * Attach this sink to the given [[org.reactivestreams.Publisher]]. Using the given + * [[FlowMaterializer]] is completely optional, especially if this sink belongs to + * a different Reactive Streams implementation. It is the responsibility of the + * caller to provide a suitable FlowMaterializer that can be used for running + * Flows if necessary. + * + * @param flowPublisher the Publisher to consume elements from + * @param materializer a FlowMaterializer that may be used for creating flows + * @param flowName the name of the current flow, which should be used in log statements or error messages + */ + def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): Unit + /** + * This method is only used for Sinks that return true from [[#isActive]], which then must + * implement it. + */ + def create(materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] @uncheckedVariance = + throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") + /** + * This method indicates whether this Sink can create a Subscriber instead of being + * attached to a Publisher. This is only used if the Flow does not contain any + * operations. + */ + def isActive: Boolean = false +} + +/** + * A sink that will create an object during materialization that the user will need + * to retrieve in order to access aspects of this sink (could be a completion Future + * or a cancellation handle, etc.) + */ +trait SinkWithKey[-Out, T] extends Sink[Out] { + /** + * Attach this sink to the given [[org.reactivestreams.Publisher]]. Using the given + * [[FlowMaterializer]] is completely optional, especially if this sink belongs to + * a different Reactive Streams implementation. It is the responsibility of the + * caller to provide a suitable FlowMaterializer that can be used for running + * Flows if necessary. + * + * @param flowPublisher the Publisher to consume elements from + * @param materializer a FlowMaterializer that may be used for creating flows + * @param flowName the name of the current flow, which should be used in log statements or error messages + */ + def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): T + /** + * This method is only used for Sinks that return true from [[#isActive]], which then must + * implement it. + */ + def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Subscriber[Out] @uncheckedVariance, T) = + throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") + /** + * This method indicates whether this Sink can create a Subscriber instead of being + * attached to a Publisher. This is only used if the Flow does not contain any + * operations. + */ + def isActive: Boolean = false + + // these are unique keys, case class equality would break them + final override def equals(other: Any): Boolean = super.equals(other) + final override def hashCode: Int = super.hashCode +} + +/** + * Holds the downstream-most [[org.reactivestreams.Publisher]] interface of the materialized flow. + * The stream will not have any subscribers attached at this point, which means that after prefetching + * elements to fill the internal buffers it will assert back-pressure until + * a subscriber connects and creates demand for elements to be emitted. + */ +object PublisherSink { + private val instance = new PublisherSink[Nothing] + def apply[T]: PublisherSink[T] = instance.asInstanceOf[PublisherSink[T]] +} + +class PublisherSink[Out]() extends SinkWithKey[Out, Publisher[Out]] { + def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Out] = flowPublisher + def publisher(m: MaterializedSink): Publisher[Out] = m.getSinkFor(this) + + override def toString: String = "PublisherSink" +} + +/** + * Holds a [[scala.concurrent.Future]] that will be fulfilled with the first + * thing that is signaled to this stream, which can be either an element (after + * which the upstream subscription is canceled), an error condition (putting + * the Future into the corresponding failed state) or the end-of-stream + * (failing the Future with a NoSuchElementException). + */ +object FutureSink { + private val instance = new FutureSink[Nothing] + def apply[T]: FutureSink[T] = instance.asInstanceOf[FutureSink[T]] +} + +class FutureSink[Out] extends SinkWithKey[Out, Future[Out]] { + def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Future[Out] = { + val (sub, f) = create(materializer, flowName) + flowPublisher.subscribe(sub) + f + } + override def isActive = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Subscriber[Out], Future[Out]) = { + val p = Promise[Out]() + val sub = new Subscriber[Out] { // TODO #15804 verify this using the RS TCK + private val sub = new AtomicReference[Subscription] + override def onSubscribe(s: Subscription): Unit = + if (!sub.compareAndSet(null, s)) s.cancel() + else s.request(1) + override def onNext(t: Out): Unit = { p.trySuccess(t); sub.get.cancel() } + override def onError(t: Throwable): Unit = p.tryFailure(t) + override def onComplete(): Unit = p.tryFailure(new NoSuchElementException("empty stream")) + } + (sub, p.future) + } + + def future(m: MaterializedSink): Future[Out] = m.getSinkFor(this) + + override def toString: String = "FutureSink" +} + +/** + * Attaches a subscriber to this stream which will just discard all received + * elements. + */ +final case object BlackholeSink extends SimpleSink[Any] { + override def attach(flowPublisher: Publisher[Any], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + flowPublisher.subscribe(create(materializer, flowName)) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Any] = + new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize) +} + +/** + * Attaches a subscriber to this stream. + */ +final case class SubscriberSink[Out](subscriber: Subscriber[Out]) extends SimpleSink[Out] { + override def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + flowPublisher.subscribe(subscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] = subscriber +} + +object OnCompleteSink { + private val SuccessUnit = Success[Unit](()) +} + +/** + * When the flow is completed, either through an error or normal + * completion, apply the provided function with [[scala.util.Success]] + * or [[scala.util.Failure]]. + */ +final case class OnCompleteSink[Out](callback: Try[Unit] ⇒ Unit) extends SimpleSink[Out] { + override def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + FlowFrom(flowPublisher).transform("onCompleteSink", () ⇒ new Transformer[Out, Unit] { + override def onNext(in: Out) = Nil + override def onError(e: Throwable) = { + callback(Failure(e)) + throw e + } + override def onTermination(e: Option[Throwable]) = { + callback(OnCompleteSink.SuccessUnit) + Nil + } + }).consume()(materializer.withNamePrefix(flowName)) +} + +/** + * Invoke the given procedure for each received element. The sink holds a [[scala.concurrent.Future]] + * that will be completed with `Success` when reaching the normal end of the stream, or completed + * with `Failure` if there is an error is signaled in the stream. + */ +final case class ForeachSink[Out](f: Out ⇒ Unit) extends SinkWithKey[Out, Future[Unit]] { + override def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Future[Unit] = { + val promise = Promise[Unit]() + FlowFrom(flowPublisher).transform("foreach", () ⇒ new Transformer[Out, Unit] { + override def onNext(in: Out) = { f(in); Nil } + override def onError(cause: Throwable): Unit = () + override def onTermination(e: Option[Throwable]) = { + e match { + case None ⇒ promise.success(()) + case Some(e) ⇒ promise.failure(e) + } + Nil + } + }).consume()(materializer.withNamePrefix(flowName)) + promise.future + } + def future(m: MaterializedSink): Future[Unit] = m.getSinkFor(this) +} + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala new file mode 100644 index 0000000000..e3e5f3e1a0 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala @@ -0,0 +1,271 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.annotation.unchecked.uncheckedVariance +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.util.{ Failure, Success } + +import org.reactivestreams.{ Publisher, Subscriber } + +import akka.stream.impl.{ ActorPublisher, EmptyPublisher, ErrorPublisher, FuturePublisher, IterablePublisher, IteratorPublisher, SimpleCallbackPublisher, TickPublisher } +import akka.stream.impl2.ActorBasedFlowMaterializer + +object FlowFrom { + /** + * Helper to create `Flow` without [[Source]]. + * Example usage: `FlowFrom[Int]` + */ + def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil) + + /** + * Helper to create `Flow` with [[Source]] from `Publisher`. + * + * Construct a transformation starting with given publisher. The transformation steps + * are executed by a series of [[org.reactivestreams.Processor]] instances + * that mediate the flow of elements downstream and the propagation of + * back-pressure upstream. + */ + def apply[T](publisher: Publisher[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(PublisherSource(publisher)) + + /** + * Helper to create `Flow` with [[Source]] from `Iterator`. + * Example usage: `FlowFrom(Seq(1,2,3).iterator)` + * + * Start a new `Flow` from the given Iterator. The produced stream of elements + * will continue until the iterator runs empty or fails during evaluation of + * the `next()` method. Elements are pulled out of the iterator + * in accordance with the demand coming from the downstream transformation + * steps. + */ + def apply[T](iterator: Iterator[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IteratorSource(iterator)) + + /** + * Helper to create `Flow` with [[Source]] from `Iterable`. + * Example usage: `FlowFrom(Seq(1,2,3))` + * + * Starts a new `Flow` from the given `Iterable`. This is like starting from an + * Iterator, but every Subscriber directly attached to the Publisher of this + * stream will see an individual flow of elements (always starting from the + * beginning) regardless of when they subscribed. + */ + def apply[T](iterable: immutable.Iterable[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IterableSource(iterable)) + + /** + * Define the sequence of elements to be produced by the given closure. + * The stream ends normally when evaluation of the closure returns a `None`. + * The stream ends exceptionally when an exception is thrown from the closure. + */ + def apply[T](f: () ⇒ Option[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(ThunkSource(f)) + + /** + * Start a new `Flow` from the given `Future`. The stream will consist of + * one element when the `Future` is completed with a successful value, which + * may happen before or after materializing the `Flow`. + * The stream terminates with an error if the `Future` is completed with a failure. + */ + def apply[T](future: Future[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(FutureSource(future)) + + /** + * Elements are produced from the tick closure periodically with the specified interval. + * The tick element will be delivered to downstream consumers that has requested any elements. + * If a consumer has not requested any elements at the point in time when the tick + * element is produced it will not receive that tick element later. It will + * receive new tick elements as soon as it has requested more elements. + */ + def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): FlowWithSource[T, T] = + FlowFrom[T].withSource(TickSource(initialDelay, interval, tick)) + +} + +/** + * This trait is a marker for a pluggable stream source. Concrete instances should + * implement [[SourceWithKey]] or [[SimpleSource]], otherwise a custom [[FlowMaterializer]] + * will have to be used to be able to attach them. + * + * All Sources defined in this package rely upon an ActorBasedFlowMaterializer being + * made available to them in order to use the attach method. Other + * FlowMaterializers can be used but must then implement the functionality of these + * Source nodes themselves (or construct an ActorBasedFlowMaterializer). + */ +trait Source[+In] + +/** + * A source that does not need to create a user-accessible object during materialization. + */ +trait SimpleSource[+In] extends Source[In] { + /** + * Attach this source to the given [[org.reactivestreams.Subscriber]]. Using the given + * [[FlowMaterializer]] is completely optional, especially if this source belongs to + * a different Reactive Streams implementation. It is the responsibility of the + * caller to provide a suitable FlowMaterializer that can be used for running + * Flows if necessary. + * + * @param flowSubscriber the Subscriber to produce elements to + * @param materializer a FlowMaterializer that may be used for creating flows + * @param flowName the name of the current flow, which should be used in log statements or error messages + */ + def attach(flowSubscriber: Subscriber[In] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): Unit + /** + * This method is only used for Sources that return true from [[#isActive]], which then must + * implement it. + */ + def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] @uncheckedVariance = + throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") + /** + * This method indicates whether this Source can create a Publisher instead of being + * attached to a Subscriber. This is only used if the Flow does not contain any + * operations. + */ + def isActive: Boolean = false +} + +/** + * A source that will create an object during materialization that the user will need + * to retrieve in order to access aspects of this source (could be a Subscriber, a + * Future/Promise, etc.). + */ +trait SourceWithKey[+In, T] extends Source[In] { + /** + * Attach this source to the given [[org.reactivestreams.Subscriber]]. Using the given + * [[FlowMaterializer]] is completely optional, especially if this source belongs to + * a different Reactive Streams implementation. It is the responsibility of the + * caller to provide a suitable FlowMaterializer that can be used for running + * Flows if necessary. + * + * @param flowSubscriber the Subscriber to produce elements to + * @param materializer a FlowMaterializer that may be used for creating flows + * @param flowName the name of the current flow, which should be used in log statements or error messages + */ + def attach(flowSubscriber: Subscriber[In] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): T + /** + * This method is only used for Sources that return true from [[#isActive]], which then must + * implement it. + */ + def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[In] @uncheckedVariance, T) = + throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true") + /** + * This method indicates whether this Source can create a Publisher instead of being + * attached to a Subscriber. This is only used if the Flow does not contain any + * operations. + */ + def isActive: Boolean = false + + // these are unique keys, case class equality would break them + final override def equals(other: Any): Boolean = super.equals(other) + final override def hashCode: Int = super.hashCode +} + +/** + * Holds a `Subscriber` representing the input side of the flow. + * The `Subscriber` can later be connected to an upstream `Publisher`. + */ +final case class SubscriberSource[In]() extends SourceWithKey[In, Subscriber[In]] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[In] = + flowSubscriber + + def subscriber(m: MaterializedSource): Subscriber[In] = m.getSourceFor(this) +} + +/** + * Construct a transformation starting with given publisher. The transformation steps + * are executed by a series of [[org.reactivestreams.Processor]] instances + * that mediate the flow of elements downstream and the propagation of + * back-pressure upstream. + */ +final case class PublisherSource[In](p: Publisher[In]) extends SimpleSource[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + p.subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = p +} + +/** + * Start a new `Flow` from the given Iterator. The produced stream of elements + * will continue until the iterator runs empty or fails during evaluation of + * the `next()` method. Elements are pulled out of the iterator + * in accordance with the demand coming from the downstream transformation + * steps. + */ +final case class IteratorSource[In](iterator: Iterator[In]) extends SimpleSource[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + create(materializer, flowName).subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = + if (iterator.isEmpty) EmptyPublisher[In] + else ActorPublisher[In](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings), + name = s"$flowName-0-iterator")) +} + +/** + * Starts a new `Flow` from the given `Iterable`. This is like starting from an + * Iterator, but every Subscriber directly attached to the Publisher of this + * stream will see an individual flow of elements (always starting from the + * beginning) regardless of when they subscribed. + */ +final case class IterableSource[In](iterable: immutable.Iterable[In]) extends SimpleSource[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + create(materializer, flowName).subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = + if (iterable.isEmpty) EmptyPublisher[In] + else ActorPublisher[In](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings), + name = s"$flowName-0-iterable"), Some(iterable)) +} + +/** + * Define the sequence of elements to be produced by the given closure. + * The stream ends normally when evaluation of the closure returns a `None`. + * The stream ends exceptionally when an exception is thrown from the closure. + */ +final case class ThunkSource[In](f: () ⇒ Option[In]) extends SimpleSource[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + create(materializer, flowName).subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = + ActorPublisher[In](materializer.actorOf(SimpleCallbackPublisher.props(materializer.settings, f), + name = s"$flowName-0-thunk")) +} + +/** + * Start a new `Flow` from the given `Future`. The stream will consist of + * one element when the `Future` is completed with a successful value, which + * may happen before or after materializing the `Flow`. + * The stream terminates with an error if the `Future` is completed with a failure. + */ +final case class FutureSource[In](future: Future[In]) extends SimpleSource[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + create(materializer, flowName).subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = + future.value match { + case Some(Success(element)) ⇒ + ActorPublisher[In](materializer.actorOf(IterablePublisher.props(List(element), materializer.settings), + name = s"$flowName-0-future"), Some(future)) + case Some(Failure(t)) ⇒ + ErrorPublisher(t).asInstanceOf[Publisher[In]] + case None ⇒ + ActorPublisher[In](materializer.actorOf(FuturePublisher.props(future, materializer.settings), + name = s"$flowName-0-future"), Some(future)) + } +} + +/** + * Elements are produced from the tick closure periodically with the specified interval. + * The tick element will be delivered to downstream consumers that has requested any elements. + * If a consumer has not requested any elements at the point in time when the tick + * element is produced it will not receive that tick element later. It will + * receive new tick elements as soon as it has requested more elements. + */ +final case class TickSource[In](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ In) extends SimpleSource[In] { + override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit = + create(materializer, flowName).subscribe(flowSubscriber) + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = + ActorPublisher[In](materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings), + name = s"$flowName-0-tick")) +} + diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala index d504b9f2c0..e9da74ef59 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowToFutureSpec.scala @@ -31,6 +31,19 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest { proc.expectCancellation() } + "yield the first value when actively constructing" in { + val p = StreamTestKit.PublisherProbe[Int]() + val f = FutureSink[Int] + val s = SubscriberSource[Int] + val m = FlowFrom[Int].withSource(s).withSink(f).run() + p.subscribe(s.subscriber(m)) + val proc = p.expectSubscription + proc.expectRequest() + proc.sendNext(42) + Await.result(f.future(m), 100.millis) should be(42) + proc.expectCancellation() + } + "yield the first error" in { val p = StreamTestKit.PublisherProbe[Int]() val f = FutureSink[Int]