=str #15755 #15756 rework Source/Sink materialization

The philosophy is that the FlowMaterializer has complete control over
how it interprets the AST, no restrictions. Therefore it only involves
one specified method: materialize() which returns a MaterializedFlow.
Within the ActorBasedFlowMaterializer we materialize Sources and Sinks
that implement the specified SimpleSource/SourceWithKey interfaces (same
for Sinks), others are not supported. These traits are extensible and
they require that an ActorBasedFlowMaterializer is passed into the
factory methods. Other materializers can of course interpret these AST
nodes differently, or they can use the actor-based facilities by
creating a suitable materializer for them to use.

This means that everything is fully extensible, but the infrastructure
we provide concretely for ourselves is built exactly for that and
nothing more. Overgeneralization would just lead nowhere.

Also made FutureSink isActive and implement it using a light-weight
Subscriber instead of a Flow/Transformer.
This commit is contained in:
Roland Kuhn 2014-09-03 21:54:18 +02:00
parent b7a509ec3e
commit 61b77ea50c
6 changed files with 592 additions and 437 deletions

View file

@ -4,37 +4,18 @@
package akka.stream.impl2 package akka.stream.impl2
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import akka.actor.{ Actor, ActorCell, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, LocalActorRef, Props, RepointableActorRef }
import akka.pattern.ask
import org.reactivestreams.{ Processor, Publisher, Subscriber }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.{ Await, Future } import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.{ Failure, Success } import org.reactivestreams.{ Processor, Publisher, Subscriber }
import akka.stream.Transformer
import akka.stream.scaladsl2.FlowMaterializer import akka.actor._
import akka.stream.MaterializerSettings import akka.pattern.ask
import akka.stream.impl.ActorPublisher import akka.stream.{ MaterializerSettings, Transformer }
import akka.stream.impl.IterablePublisher import akka.stream.impl.{ ActorProcessor, ActorPublisher, ExposedPublisher, TransformProcessorImpl }
import akka.stream.impl.IteratorPublisher import akka.stream.scaladsl2._
import akka.stream.impl.TransformProcessorImpl
import akka.stream.impl.ActorProcessor
import akka.stream.impl.ExposedPublisher
import akka.stream.scaladsl2.Source
import akka.stream.scaladsl2.Sink
import akka.stream.scaladsl2.MaterializedFlow
import akka.stream.scaladsl2.IterableSource
import akka.stream.impl.EmptyPublisher
import akka.stream.scaladsl2.IteratorSource
import akka.stream.scaladsl2.PublisherSource
import akka.stream.scaladsl2.ThunkSource
import akka.stream.impl.SimpleCallbackPublisher
import akka.stream.scaladsl2.FutureSource
import akka.stream.impl.FuturePublisher
import akka.stream.impl.ErrorPublisher
import akka.stream.impl.TickPublisher
import akka.stream.scaladsl2.TickSource
/** /**
* INTERNAL API * INTERNAL API
@ -51,12 +32,12 @@ private[akka] object Ast {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class ActorBasedFlowMaterializer( case class ActorBasedFlowMaterializer(override val settings: MaterializerSettings,
override val settings: MaterializerSettings, supervisor: ActorRef,
supervisor: ActorRef, flowNameCounter: AtomicLong,
flowNameCounter: AtomicLong, namePrefix: String)
namePrefix: String)
extends FlowMaterializer(settings) { extends FlowMaterializer(settings) {
import akka.stream.impl2.Ast._ import akka.stream.impl2.Ast._
def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name) def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name)
@ -80,87 +61,67 @@ private[akka] case class ActorBasedFlowMaterializer(
override def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow = { override def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow = {
val flowName = createFlowName() val flowName = createFlowName()
// FIXME specialcasing, otherwise some tests fail in FlowIterableSpec due to the injected identityProcessor: def attachSink(pub: Publisher[Out]) = sink match {
// - "have value equality of publisher" case s: SimpleSink[Out] s.attach(pub, this, flowName)
// - "produce elements to later subscriber" case s: SinkWithKey[Out, _] s.attach(pub, this, flowName)
def specialCase: PartialFunction[Source[In], Publisher[Out]] = { case _ throw new MaterializationException("unknown Sink type " + sink.getClass)
case PublisherSource(p) p.asInstanceOf[Publisher[Out]] }
case src: IterableSource[In] materializeSource(src, flowName).asInstanceOf[Publisher[Out]] def attachSource(sub: Subscriber[In]) = source match {
case src: IteratorSource[In] materializeSource(src, flowName).asInstanceOf[Publisher[Out]] case s: SimpleSource[In] s.attach(sub, this, flowName)
case src: TickSource[In] materializeSource(src, flowName).asInstanceOf[Publisher[Out]] case s: SourceWithKey[In, _] s.attach(sub, this, flowName)
case _ throw new MaterializationException("unknown Source type " + sink.getClass)
}
def createSink() = sink.asInstanceOf[Sink[In]] match {
case s: SimpleSink[In] s.create(this, flowName) -> (())
case s: SinkWithKey[In, _] s.create(this, flowName)
case _ throw new MaterializationException("unknown Sink type " + sink.getClass)
}
def createSource() = source.asInstanceOf[Source[Out]] match {
case s: SimpleSource[Out] s.create(this, flowName) -> (())
case s: SourceWithKey[Out, _] s.create(this, flowName)
case _ throw new MaterializationException("unknown Source type " + sink.getClass)
}
def isActive(s: AnyRef) = s match {
case source: SimpleSource[_] source.isActive
case source: SourceWithKey[_, _] source.isActive
case sink: SimpleSink[_] sink.isActive
case sink: SinkWithKey[_, _] sink.isActive
case _: Source[_] throw new MaterializationException("unknown Source type " + sink.getClass)
case _: Sink[_] throw new MaterializationException("unknown Sink type " + sink.getClass)
} }
if (ops.isEmpty && specialCase.isDefinedAt(source)) { val (sourceValue, sinkValue) =
val p = specialCase(source) if (ops.isEmpty) {
val sinkValue = sink.attach(p, this) if (isActive(sink)) {
new MaterializedFlow(source, None, sink, sinkValue) val (sub, value) = createSink()
} else { (attachSource(sub), value)
val (s, p) = } else if (isActive(source)) {
if (ops.isEmpty) { val (pub, value) = createSource()
val identityProcessor: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]] (value, attachSink(pub))
(identityProcessor, identityProcessor)
} else { } else {
val opsSize = ops.size val id: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]]
val outProcessor = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[In, Out]] (attachSource(id), attachSink(id))
val topSubscriber = processorChain(outProcessor, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Out]]
(topSubscriber, outProcessor)
} }
val sourceValue = source.attach(s, this, flowName) } else {
val sinkValue = sink.attach(p, this) val opsSize = ops.size
new MaterializedFlow(source, sourceValue, sink, sinkValue) val last = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[Any, Out]]
} val first = processorChain(last, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Any]]
(attachSource(first), attachSink(last))
}
new MaterializedFlow(source, sourceValue, sink, sinkValue)
} }
private def identityProcessor[I](flowName: String): Processor[I, I] =
processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[I, I]]
private val identityTransform = Transform("identity", () private val identityTransform = Transform("identity", ()
new Transformer[Any, Any] { new Transformer[Any, Any] {
override def onNext(element: Any) = List(element) override def onNext(element: Any) = List(element)
}) })
override def materializeSource[In](source: IterableSource[In], flowName: String): Publisher[In] = {
if (source.iterable.isEmpty) EmptyPublisher[In]
else ActorPublisher(actorOf(IterablePublisher.props(source.iterable, settings),
name = s"$flowName-0-iterable"), Some(source.iterable))
}
override def materializeSource[In](source: IteratorSource[In], flowName: String): Publisher[In] = {
if (source.iterator.isEmpty) EmptyPublisher[In]
else ActorPublisher[In](actorOf(IteratorPublisher.props(source.iterator, settings),
name = s"$flowName-0-iterator"))
}
override def materializeSource[In](source: ThunkSource[In], flowName: String): Publisher[In] = {
ActorPublisher[In](actorOf(SimpleCallbackPublisher.props(settings, source.f),
name = s"$flowName-0-thunk"))
}
override def materializeSource[In](source: FutureSource[In], flowName: String): Publisher[In] = {
source.future.value match {
case Some(Success(element))
ActorPublisher[In](actorOf(IterablePublisher.props(List(element), settings),
name = s"$flowName-0-future"), Some(source.future))
case Some(Failure(t))
ErrorPublisher(t).asInstanceOf[Publisher[In]]
case None
ActorPublisher[In](actorOf(FuturePublisher.props(source.future, settings),
name = s"$flowName-0-future"), Some(source.future))
}
}
override def materializeSource[In](source: TickSource[In], flowName: String): Publisher[In] = {
ActorPublisher[In](actorOf(TickPublisher.props(source.initialDelay, source.interval, source.tick, settings),
name = s"$flowName-0-tick"))
}
private def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { private def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
val impl = actorOf(ActorProcessorFactory.props(settings, op), s"$flowName-$n-${op.name}") val impl = actorOf(ActorProcessorFactory.props(settings, op), s"$flowName-$n-${op.name}")
ActorProcessorFactory(impl) ActorProcessorFactory(impl)
} }
private def actorOf(props: Props, name: String): ActorRef = supervisor match { def actorOf(props: Props, name: String): ActorRef = supervisor match {
case ref: LocalActorRef case ref: LocalActorRef
ref.underlying.attachChild(props, name, systemService = false) ref.underlying.attachChild(props, name, systemService = false)
case ref: RepointableActorRef case ref: RepointableActorRef
@ -228,4 +189,4 @@ private[akka] object ActorProcessorFactory {
impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]]) impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]])
p p
} }
} }

View file

@ -21,312 +21,13 @@ import scala.util.Try
import scala.util.Failure import scala.util.Failure
import scala.util.Success import scala.util.Success
/**
* This is the interface from which all concrete Flows inherit. No generic
* operations are presented because the concrete type of Flow (i.e. whether
* it has a [[Source]] or a [[Sink]]) determines what is available.
*/
sealed trait Flow sealed trait Flow
object FlowFrom {
/**
* Helper to create `Flow` without [[Source]].
* Example usage: `FlowFrom[Int]`
*/
def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil)
/**
* Helper to create `Flow` with [[Source]] from `Publisher`.
*
* Construct a transformation starting with given publisher. The transformation steps
* are executed by a series of [[org.reactivestreams.Processor]] instances
* that mediate the flow of elements downstream and the propagation of
* back-pressure upstream.
*/
def apply[T](publisher: Publisher[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(PublisherSource(publisher))
/**
* Helper to create `Flow` with [[Source]] from `Iterator`.
* Example usage: `FlowFrom(Seq(1,2,3).iterator)`
*
* Start a new `Flow` from the given Iterator. The produced stream of elements
* will continue until the iterator runs empty or fails during evaluation of
* the `next()` method. Elements are pulled out of the iterator
* in accordance with the demand coming from the downstream transformation
* steps.
*/
def apply[T](iterator: Iterator[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IteratorSource(iterator))
/**
* Helper to create `Flow` with [[Source]] from `Iterable`.
* Example usage: `FlowFrom(Seq(1,2,3))`
*
* Starts a new `Flow` from the given `Iterable`. This is like starting from an
* Iterator, but every Subscriber directly attached to the Publisher of this
* stream will see an individual flow of elements (always starting from the
* beginning) regardless of when they subscribed.
*/
def apply[T](iterable: immutable.Iterable[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IterableSource(iterable))
/**
* Define the sequence of elements to be produced by the given closure.
* The stream ends normally when evaluation of the closure returns a `None`.
* The stream ends exceptionally when an exception is thrown from the closure.
*/
def apply[T](f: () Option[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(ThunkSource(f))
/**
* Start a new `Flow` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
*/
def apply[T](future: Future[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(FutureSource(future))
/**
* Elements are produced from the tick closure periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements.
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): FlowWithSource[T, T] =
FlowFrom[T].withSource(TickSource(initialDelay, interval, tick))
}
trait Source[+In] {
def attach(flowSubscriber: Subscriber[In] @uncheckedVariance, materializer: FlowMaterializer, flowName: String): Any
}
trait SourceKey[+In, T] extends Source[In] {
override def attach(flowSubscriber: Subscriber[In] @uncheckedVariance, materializer: FlowMaterializer, flowName: String): T
// these are unique keys, case class equality would break them
final override def equals(other: Any): Boolean = super.equals(other)
final override def hashCode: Int = super.hashCode
}
/**
* Holds a `Subscriber` representing the input side of the flow.
* The `Subscriber` can later be connected to an upstream `Publisher`.
*/
final case class SubscriberSource[In]() extends SourceKey[In, Subscriber[In]] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): Subscriber[In] =
flowSubscriber
def subscriber(m: MaterializedSource): Subscriber[In] = m.getSourceFor(this)
}
/**
* Construct a transformation starting with given publisher. The transformation steps
* are executed by a series of [[org.reactivestreams.Processor]] instances
* that mediate the flow of elements downstream and the propagation of
* back-pressure upstream.
*/
final case class PublisherSource[In](p: Publisher[In]) extends Source[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = {
p.subscribe(flowSubscriber)
None
}
}
/**
* Start a new `Flow` from the given Iterator. The produced stream of elements
* will continue until the iterator runs empty or fails during evaluation of
* the `next()` method. Elements are pulled out of the iterator
* in accordance with the demand coming from the downstream transformation
* steps.
*/
final case class IteratorSource[In](iterator: Iterator[In]) extends Source[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = {
val p: Publisher[In] = materializer.materializeSource(this, flowName)
p.subscribe(flowSubscriber)
None
}
}
/**
* Starts a new `Flow` from the given `Iterable`. This is like starting from an
* Iterator, but every Subscriber directly attached to the Publisher of this
* stream will see an individual flow of elements (always starting from the
* beginning) regardless of when they subscribed.
*/
final case class IterableSource[In](iterable: immutable.Iterable[In]) extends Source[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = {
val p: Publisher[In] = materializer.materializeSource(this, flowName)
p.subscribe(flowSubscriber)
None
}
}
/**
* Define the sequence of elements to be produced by the given closure.
* The stream ends normally when evaluation of the closure returns a `None`.
* The stream ends exceptionally when an exception is thrown from the closure.
*/
final case class ThunkSource[In](f: () Option[In]) extends Source[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = {
val p: Publisher[In] = materializer.materializeSource(this, flowName)
p.subscribe(flowSubscriber)
None
}
}
/**
* Start a new `Flow` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
*/
final case class FutureSource[In](future: Future[In]) extends Source[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = {
val p: Publisher[In] = materializer.materializeSource(this, flowName)
p.subscribe(flowSubscriber)
None
}
}
/**
* Elements are produced from the tick closure periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements.
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
final case class TickSource[In](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () In) extends Source[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: FlowMaterializer, flowName: String): AnyRef = {
val p: Publisher[In] = materializer.materializeSource(this, flowName)
p.subscribe(flowSubscriber)
None
}
}
trait Sink[-Out] {
def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: FlowMaterializer): Any
}
trait SinkKey[-Out, T] extends Sink[Out] {
override def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: FlowMaterializer): T
// these are unique keys, case class equality would break them
final override def equals(other: Any): Boolean = super.equals(other)
final override def hashCode: Int = super.hashCode
}
/**
* Holds the downstream-most [[org.reactivestreams.Publisher]] interface of the materialized flow.
* The stream will not have any subscribers attached at this point, which means that after prefetching
* elements to fill the internal buffers it will assert back-pressure until
* a subscriber connects and creates demand for elements to be emitted.
*/
object PublisherSink {
private val instance = new PublisherSink[Nothing]
def apply[T]: PublisherSink[T] = instance.asInstanceOf[PublisherSink[T]]
}
class PublisherSink[Out]() extends SinkKey[Out, Publisher[Out]] {
def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): Publisher[Out] = flowPublisher
def publisher(m: MaterializedSink): Publisher[Out] = m.getSinkFor(this)
override def toString: String = "FutureSink"
}
/**
* Holds a [[scala.concurrent.Future]] that will be fulfilled with the first
* thing that is signaled to this stream, which can be either an element (after
* which the upstream subscription is canceled), an error condition (putting
* the Future into the corresponding failed state) or the end-of-stream
* (failing the Future with a NoSuchElementException).
*/
object FutureSink {
private val instance = new FutureSink[Nothing]
def apply[T]: FutureSink[T] = instance.asInstanceOf[FutureSink[T]]
}
class FutureSink[Out] extends SinkKey[Out, Future[Out]] {
def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): Future[Out] = {
val p = Promise[Out]()
FlowFrom(flowPublisher).transform("futureSink", () new Transformer[Out, Unit] {
var done = false
override def onNext(in: Out) = { p success in; done = true; Nil }
override def onError(e: Throwable) = { p failure e }
override def isComplete = done
override def onTermination(e: Option[Throwable]) = { p.tryFailure(new NoSuchElementException("empty stream")); Nil }
}).consume()(materializer)
p.future
}
def future(m: MaterializedSink): Future[Out] = m.getSinkFor(this)
override def toString: String = "FutureSink"
}
/**
* Attaches a subscriber to this stream which will just discard all received
* elements.
*/
final case object BlackholeSink extends Sink[Any] {
override def attach(flowPublisher: Publisher[Any], materializer: FlowMaterializer): AnyRef = {
val s = new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize)
flowPublisher.subscribe(s)
None
}
}
/**
* Attaches a subscriber to this stream.
*/
final case class SubscriberSink[Out](subscriber: Subscriber[Out]) extends Sink[Out] {
override def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): AnyRef = {
flowPublisher.subscribe(subscriber)
None
}
}
object OnCompleteSink {
private val SuccessUnit = Success[Unit](())
}
/**
* When the flow is completed, either through an error or normal
* completion, apply the provided function with [[scala.util.Success]]
* or [[scala.util.Failure]].
*/
final case class OnCompleteSink[Out](callback: Try[Unit] Unit) extends Sink[Out] {
override def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): AnyRef = {
FlowFrom(flowPublisher).transform("onCompleteSink", () new Transformer[Out, Unit] {
override def onNext(in: Out) = Nil
override def onError(e: Throwable) = {
callback(Failure(e))
throw e
}
override def onTermination(e: Option[Throwable]) = {
callback(OnCompleteSink.SuccessUnit)
Nil
}
}).consume()(materializer)
None
}
}
/**
* Invoke the given procedure for each received element. The sink holds a [[scala.concurrent.Future]]
* that will be completed with `Success` when reaching the normal end of the stream, or completed
* with `Failure` if there is an error is signaled in the stream.
*/
final case class ForeachSink[Out](f: Out Unit) extends SinkKey[Out, Future[Unit]] {
override def attach(flowPublisher: Publisher[Out], materializer: FlowMaterializer): Future[Unit] = {
val promise = Promise[Unit]()
FlowFrom(flowPublisher).transform("foreach", () new Transformer[Out, Unit] {
override def onNext(in: Out) = { f(in); Nil }
override def onError(cause: Throwable): Unit = ()
override def onTermination(e: Option[Throwable]) = {
e match {
case None promise.success(())
case Some(e) promise.failure(e)
}
Nil
}
}).consume()(materializer)
promise.future
}
def future(m: MaterializedSink): Future[Unit] = m.getSinkFor(this)
}
/** /**
* Marker interface for flows that have a free (attachable) input side. * Marker interface for flows that have a free (attachable) input side.
*/ */
@ -379,11 +80,11 @@ final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends FlowOps[In
*/ */
final case class FlowWithSink[-In, +Out](private[scaladsl2] val output: Sink[Out @uncheckedVariance], ops: List[AstNode]) extends HasNoSource[In] { final case class FlowWithSink[-In, +Out](private[scaladsl2] val output: Sink[Out @uncheckedVariance], ops: List[AstNode]) extends HasNoSource[In] {
def withSource(in: Source[In]): RunnableFlow[In, Out] = new RunnableFlow(in, output, ops) def withSource(in: Source[In]): RunnableFlow[In, Out] = RunnableFlow(in, output, ops)
def withoutSink: ProcessorFlow[In, Out] = ProcessorFlow(ops) def withoutSink: ProcessorFlow[In, Out] = ProcessorFlow(ops)
def prepend[T](f: ProcessorFlow[T, In]): FlowWithSink[T, Out] = FlowWithSink(output, ops ::: f.ops) def prepend[T](f: ProcessorFlow[T, In]): FlowWithSink[T, Out] = FlowWithSink(output, ops ::: f.ops)
def prepend[T](f: FlowWithSource[T, In]): RunnableFlow[T, Out] = new RunnableFlow(f.input, output, ops ::: f.ops) def prepend[T](f: FlowWithSource[T, In]): RunnableFlow[T, Out] = RunnableFlow(f.input, output, ops ::: f.ops)
def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] = { def toSubscriber()(implicit materializer: FlowMaterializer): Subscriber[In @uncheckedVariance] = {
val subIn = SubscriberSource[In]() val subIn = SubscriberSource[In]()
@ -400,11 +101,11 @@ final case class FlowWithSource[-In, +Out](private[scaladsl2] val input: Source[
override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops)
def withSink(out: Sink[Out]): RunnableFlow[In, Out] = new RunnableFlow(input, out, ops) def withSink(out: Sink[Out]): RunnableFlow[In, Out] = RunnableFlow(input, out, ops)
def withoutSource: ProcessorFlow[In, Out] = ProcessorFlow(ops) def withoutSource: ProcessorFlow[In, Out] = ProcessorFlow(ops)
def append[T](f: ProcessorFlow[Out, T]): FlowWithSource[In, T] = FlowWithSource(input, f.ops ++: ops) def append[T](f: ProcessorFlow[Out, T]): FlowWithSource[In, T] = FlowWithSource(input, f.ops ++: ops)
def append[T](f: FlowWithSink[Out, T]): RunnableFlow[In, T] = new RunnableFlow(input, f.output, f.ops ++: ops) def append[T](f: FlowWithSink[Out, T]): RunnableFlow[In, T] = RunnableFlow(input, f.output, f.ops ++: ops)
def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = { def toPublisher()(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = {
val pubOut = PublisherSink[Out] val pubOut = PublisherSink[Out]
@ -433,19 +134,19 @@ final case class RunnableFlow[-In, +Out](private[scaladsl2] val input: Source[In
} }
class MaterializedFlow(sourceKey: AnyRef, matSource: Any, sinkKey: AnyRef, matSink: Any) extends MaterializedSource with MaterializedSink { class MaterializedFlow(sourceKey: AnyRef, matSource: Any, sinkKey: AnyRef, matSink: Any) extends MaterializedSource with MaterializedSink {
override def getSourceFor[T](key: SourceKey[_, T]): T = override def getSourceFor[T](key: SourceWithKey[_, T]): T =
if (key == sourceKey) matSource.asInstanceOf[T] if (key == sourceKey) matSource.asInstanceOf[T]
else throw new IllegalArgumentException(s"Source key [$key] doesn't match the source [$sourceKey] of this flow") else throw new IllegalArgumentException(s"Source key [$key] doesn't match the source [$sourceKey] of this flow")
def getSinkFor[T](key: SinkKey[_, T]): T = def getSinkFor[T](key: SinkWithKey[_, T]): T =
if (key == sinkKey) matSink.asInstanceOf[T] if (key == sinkKey) matSink.asInstanceOf[T]
else throw new IllegalArgumentException(s"Sink key [$key] doesn't match the sink [$sinkKey] of this flow") else throw new IllegalArgumentException(s"Sink key [$key] doesn't match the sink [$sinkKey] of this flow")
} }
trait MaterializedSource { trait MaterializedSource {
def getSourceFor[T](sourceKey: SourceKey[_, T]): T def getSourceFor[T](sourceKey: SourceWithKey[_, T]): T
} }
trait MaterializedSink { trait MaterializedSink {
def getSinkFor[T](sinkKey: SinkKey[_, T]): T def getSinkFor[T](sinkKey: SinkWithKey[_, T]): T
} }

View file

@ -3,17 +3,9 @@
*/ */
package akka.stream.scaladsl2 package akka.stream.scaladsl2
import scala.concurrent.duration._ import akka.actor.{ ActorContext, ActorRefFactory, ActorSystem, ExtendedActorSystem }
import org.reactivestreams.Publisher
import akka.actor.ActorContext
import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
import akka.stream.impl2.ActorBasedFlowMaterializer import akka.stream.impl2.{ ActorBasedFlowMaterializer, Ast, FlowNameCounter, StreamSupervisor }
import akka.stream.impl2.Ast
import akka.stream.impl2.FlowNameCounter
import akka.stream.impl2.StreamSupervisor
object FlowMaterializer { object FlowMaterializer {
@ -131,26 +123,25 @@ object FlowMaterializer {
abstract class FlowMaterializer(val settings: MaterializerSettings) { abstract class FlowMaterializer(val settings: MaterializerSettings) {
/** /**
* The `namePrefix` is used as the first part of the names of the actors running * The `namePrefix` shall be used for deriving the names of processing
* the processing steps. * entities that are created during materialization. This is meant to aid
* logging and error reporting both during materialization and while the
* stream is running.
*/ */
def withNamePrefix(name: String): FlowMaterializer def withNamePrefix(name: String): FlowMaterializer
/** /**
* INTERNAL API * This method interprets the given Flow description and creates the running
* ops are stored in reverse order * stream. The result can be highly implementation specific, ranging from
* local actor chains to remote-deployed processing networks.
*/ */
private[akka] def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow
def materializeSource[In](source: IterableSource[In], flowName: String): Publisher[In]
def materializeSource[In](source: IteratorSource[In], flowName: String): Publisher[In]
def materializeSource[In](source: ThunkSource[In], flowName: String): Publisher[In]
def materializeSource[In](source: FutureSource[In], flowName: String): Publisher[In]
def materializeSource[In](source: TickSource[In], flowName: String): Publisher[In]
} }
/**
* This exception or subtypes thereof should be used to signal materialization
* failures.
*/
class MaterializationException(msg: String, cause: Throwable = null) extends RuntimeException(msg, cause)

View file

@ -0,0 +1,218 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success, Try }
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import akka.stream.Transformer
import akka.stream.impl.BlackholeSubscriber
import akka.stream.impl2.ActorBasedFlowMaterializer
import java.util.concurrent.atomic.AtomicReference
/**
* This trait is a marker for a pluggable stream sink. Concrete instances should
* implement [[SinkWithKey]] or [[SimpleSink]], otherwise a custom [[FlowMaterializer]]
* will have to be used to be able to attach them.
*
* All Sinks defined in this package rely upon an [[ActorBasedFlowMaterializer]] being
* made available to them in order to use the <code>attach</code> method. Other
* FlowMaterializers can be used but must then implement the functionality of these
* Sink nodes themselves (or construct an ActorBasedFlowMaterializer).
*/
trait Sink[-Out]
/**
* A sink that does not need to create a user-accessible object during materialization.
*/
trait SimpleSink[-Out] extends Sink[Out] {
/**
* Attach this sink to the given [[org.reactivestreams.Publisher]]. Using the given
* [[FlowMaterializer]] is completely optional, especially if this sink belongs to
* a different Reactive Streams implementation. It is the responsibility of the
* caller to provide a suitable FlowMaterializer that can be used for running
* Flows if necessary.
*
* @param flowPublisher the Publisher to consume elements from
* @param materializer a FlowMaterializer that may be used for creating flows
* @param flowName the name of the current flow, which should be used in log statements or error messages
*/
def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): Unit
/**
* This method is only used for Sinks that return true from [[#isActive]], which then must
* implement it.
*/
def create(materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] @uncheckedVariance =
throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true")
/**
* This method indicates whether this Sink can create a Subscriber instead of being
* attached to a Publisher. This is only used if the Flow does not contain any
* operations.
*/
def isActive: Boolean = false
}
/**
* A sink that will create an object during materialization that the user will need
* to retrieve in order to access aspects of this sink (could be a completion Future
* or a cancellation handle, etc.)
*/
trait SinkWithKey[-Out, T] extends Sink[Out] {
/**
* Attach this sink to the given [[org.reactivestreams.Publisher]]. Using the given
* [[FlowMaterializer]] is completely optional, especially if this sink belongs to
* a different Reactive Streams implementation. It is the responsibility of the
* caller to provide a suitable FlowMaterializer that can be used for running
* Flows if necessary.
*
* @param flowPublisher the Publisher to consume elements from
* @param materializer a FlowMaterializer that may be used for creating flows
* @param flowName the name of the current flow, which should be used in log statements or error messages
*/
def attach(flowPublisher: Publisher[Out @uncheckedVariance], materializer: ActorBasedFlowMaterializer, flowName: String): T
/**
* This method is only used for Sinks that return true from [[#isActive]], which then must
* implement it.
*/
def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Subscriber[Out] @uncheckedVariance, T) =
throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true")
/**
* This method indicates whether this Sink can create a Subscriber instead of being
* attached to a Publisher. This is only used if the Flow does not contain any
* operations.
*/
def isActive: Boolean = false
// these are unique keys, case class equality would break them
final override def equals(other: Any): Boolean = super.equals(other)
final override def hashCode: Int = super.hashCode
}
/**
* Holds the downstream-most [[org.reactivestreams.Publisher]] interface of the materialized flow.
* The stream will not have any subscribers attached at this point, which means that after prefetching
* elements to fill the internal buffers it will assert back-pressure until
* a subscriber connects and creates demand for elements to be emitted.
*/
object PublisherSink {
private val instance = new PublisherSink[Nothing]
def apply[T]: PublisherSink[T] = instance.asInstanceOf[PublisherSink[T]]
}
class PublisherSink[Out]() extends SinkWithKey[Out, Publisher[Out]] {
def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Out] = flowPublisher
def publisher(m: MaterializedSink): Publisher[Out] = m.getSinkFor(this)
override def toString: String = "PublisherSink"
}
/**
* Holds a [[scala.concurrent.Future]] that will be fulfilled with the first
* thing that is signaled to this stream, which can be either an element (after
* which the upstream subscription is canceled), an error condition (putting
* the Future into the corresponding failed state) or the end-of-stream
* (failing the Future with a NoSuchElementException).
*/
object FutureSink {
private val instance = new FutureSink[Nothing]
def apply[T]: FutureSink[T] = instance.asInstanceOf[FutureSink[T]]
}
class FutureSink[Out] extends SinkWithKey[Out, Future[Out]] {
def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Future[Out] = {
val (sub, f) = create(materializer, flowName)
flowPublisher.subscribe(sub)
f
}
override def isActive = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Subscriber[Out], Future[Out]) = {
val p = Promise[Out]()
val sub = new Subscriber[Out] { // TODO #15804 verify this using the RS TCK
private val sub = new AtomicReference[Subscription]
override def onSubscribe(s: Subscription): Unit =
if (!sub.compareAndSet(null, s)) s.cancel()
else s.request(1)
override def onNext(t: Out): Unit = { p.trySuccess(t); sub.get.cancel() }
override def onError(t: Throwable): Unit = p.tryFailure(t)
override def onComplete(): Unit = p.tryFailure(new NoSuchElementException("empty stream"))
}
(sub, p.future)
}
def future(m: MaterializedSink): Future[Out] = m.getSinkFor(this)
override def toString: String = "FutureSink"
}
/**
* Attaches a subscriber to this stream which will just discard all received
* elements.
*/
final case object BlackholeSink extends SimpleSink[Any] {
override def attach(flowPublisher: Publisher[Any], materializer: ActorBasedFlowMaterializer, flowName: String): Unit =
flowPublisher.subscribe(create(materializer, flowName))
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Any] =
new BlackholeSubscriber[Any](materializer.settings.maxInputBufferSize)
}
/**
* Attaches a subscriber to this stream.
*/
final case class SubscriberSink[Out](subscriber: Subscriber[Out]) extends SimpleSink[Out] {
override def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Unit =
flowPublisher.subscribe(subscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[Out] = subscriber
}
object OnCompleteSink {
private val SuccessUnit = Success[Unit](())
}
/**
* When the flow is completed, either through an error or normal
* completion, apply the provided function with [[scala.util.Success]]
* or [[scala.util.Failure]].
*/
final case class OnCompleteSink[Out](callback: Try[Unit] Unit) extends SimpleSink[Out] {
override def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Unit =
FlowFrom(flowPublisher).transform("onCompleteSink", () new Transformer[Out, Unit] {
override def onNext(in: Out) = Nil
override def onError(e: Throwable) = {
callback(Failure(e))
throw e
}
override def onTermination(e: Option[Throwable]) = {
callback(OnCompleteSink.SuccessUnit)
Nil
}
}).consume()(materializer.withNamePrefix(flowName))
}
/**
* Invoke the given procedure for each received element. The sink holds a [[scala.concurrent.Future]]
* that will be completed with `Success` when reaching the normal end of the stream, or completed
* with `Failure` if there is an error is signaled in the stream.
*/
final case class ForeachSink[Out](f: Out Unit) extends SinkWithKey[Out, Future[Unit]] {
override def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Future[Unit] = {
val promise = Promise[Unit]()
FlowFrom(flowPublisher).transform("foreach", () new Transformer[Out, Unit] {
override def onNext(in: Out) = { f(in); Nil }
override def onError(cause: Throwable): Unit = ()
override def onTermination(e: Option[Throwable]) = {
e match {
case None promise.success(())
case Some(e) promise.failure(e)
}
Nil
}
}).consume()(materializer.withNamePrefix(flowName))
promise.future
}
def future(m: MaterializedSink): Future[Unit] = m.getSinkFor(this)
}

View file

@ -0,0 +1,271 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.{ Failure, Success }
import org.reactivestreams.{ Publisher, Subscriber }
import akka.stream.impl.{ ActorPublisher, EmptyPublisher, ErrorPublisher, FuturePublisher, IterablePublisher, IteratorPublisher, SimpleCallbackPublisher, TickPublisher }
import akka.stream.impl2.ActorBasedFlowMaterializer
object FlowFrom {
/**
* Helper to create `Flow` without [[Source]].
* Example usage: `FlowFrom[Int]`
*/
def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](Nil)
/**
* Helper to create `Flow` with [[Source]] from `Publisher`.
*
* Construct a transformation starting with given publisher. The transformation steps
* are executed by a series of [[org.reactivestreams.Processor]] instances
* that mediate the flow of elements downstream and the propagation of
* back-pressure upstream.
*/
def apply[T](publisher: Publisher[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(PublisherSource(publisher))
/**
* Helper to create `Flow` with [[Source]] from `Iterator`.
* Example usage: `FlowFrom(Seq(1,2,3).iterator)`
*
* Start a new `Flow` from the given Iterator. The produced stream of elements
* will continue until the iterator runs empty or fails during evaluation of
* the `next()` method. Elements are pulled out of the iterator
* in accordance with the demand coming from the downstream transformation
* steps.
*/
def apply[T](iterator: Iterator[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IteratorSource(iterator))
/**
* Helper to create `Flow` with [[Source]] from `Iterable`.
* Example usage: `FlowFrom(Seq(1,2,3))`
*
* Starts a new `Flow` from the given `Iterable`. This is like starting from an
* Iterator, but every Subscriber directly attached to the Publisher of this
* stream will see an individual flow of elements (always starting from the
* beginning) regardless of when they subscribed.
*/
def apply[T](iterable: immutable.Iterable[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(IterableSource(iterable))
/**
* Define the sequence of elements to be produced by the given closure.
* The stream ends normally when evaluation of the closure returns a `None`.
* The stream ends exceptionally when an exception is thrown from the closure.
*/
def apply[T](f: () Option[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(ThunkSource(f))
/**
* Start a new `Flow` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
*/
def apply[T](future: Future[T]): FlowWithSource[T, T] = FlowFrom[T].withSource(FutureSource(future))
/**
* Elements are produced from the tick closure periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements.
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): FlowWithSource[T, T] =
FlowFrom[T].withSource(TickSource(initialDelay, interval, tick))
}
/**
* This trait is a marker for a pluggable stream source. Concrete instances should
* implement [[SourceWithKey]] or [[SimpleSource]], otherwise a custom [[FlowMaterializer]]
* will have to be used to be able to attach them.
*
* All Sources defined in this package rely upon an ActorBasedFlowMaterializer being
* made available to them in order to use the <code>attach</code> method. Other
* FlowMaterializers can be used but must then implement the functionality of these
* Source nodes themselves (or construct an ActorBasedFlowMaterializer).
*/
trait Source[+In]
/**
* A source that does not need to create a user-accessible object during materialization.
*/
trait SimpleSource[+In] extends Source[In] {
/**
* Attach this source to the given [[org.reactivestreams.Subscriber]]. Using the given
* [[FlowMaterializer]] is completely optional, especially if this source belongs to
* a different Reactive Streams implementation. It is the responsibility of the
* caller to provide a suitable FlowMaterializer that can be used for running
* Flows if necessary.
*
* @param flowSubscriber the Subscriber to produce elements to
* @param materializer a FlowMaterializer that may be used for creating flows
* @param flowName the name of the current flow, which should be used in log statements or error messages
*/
def attach(flowSubscriber: Subscriber[In] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): Unit
/**
* This method is only used for Sources that return true from [[#isActive]], which then must
* implement it.
*/
def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] @uncheckedVariance =
throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true")
/**
* This method indicates whether this Source can create a Publisher instead of being
* attached to a Subscriber. This is only used if the Flow does not contain any
* operations.
*/
def isActive: Boolean = false
}
/**
* A source that will create an object during materialization that the user will need
* to retrieve in order to access aspects of this source (could be a Subscriber, a
* Future/Promise, etc.).
*/
trait SourceWithKey[+In, T] extends Source[In] {
/**
* Attach this source to the given [[org.reactivestreams.Subscriber]]. Using the given
* [[FlowMaterializer]] is completely optional, especially if this source belongs to
* a different Reactive Streams implementation. It is the responsibility of the
* caller to provide a suitable FlowMaterializer that can be used for running
* Flows if necessary.
*
* @param flowSubscriber the Subscriber to produce elements to
* @param materializer a FlowMaterializer that may be used for creating flows
* @param flowName the name of the current flow, which should be used in log statements or error messages
*/
def attach(flowSubscriber: Subscriber[In] @uncheckedVariance, materializer: ActorBasedFlowMaterializer, flowName: String): T
/**
* This method is only used for Sources that return true from [[#isActive]], which then must
* implement it.
*/
def create(materializer: ActorBasedFlowMaterializer, flowName: String): (Publisher[In] @uncheckedVariance, T) =
throw new UnsupportedOperationException(s"forgot to implement create() for $getClass that says isActive==true")
/**
* This method indicates whether this Source can create a Publisher instead of being
* attached to a Subscriber. This is only used if the Flow does not contain any
* operations.
*/
def isActive: Boolean = false
// these are unique keys, case class equality would break them
final override def equals(other: Any): Boolean = super.equals(other)
final override def hashCode: Int = super.hashCode
}
/**
* Holds a `Subscriber` representing the input side of the flow.
* The `Subscriber` can later be connected to an upstream `Publisher`.
*/
final case class SubscriberSource[In]() extends SourceWithKey[In, Subscriber[In]] {
override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Subscriber[In] =
flowSubscriber
def subscriber(m: MaterializedSource): Subscriber[In] = m.getSourceFor(this)
}
/**
* Construct a transformation starting with given publisher. The transformation steps
* are executed by a series of [[org.reactivestreams.Processor]] instances
* that mediate the flow of elements downstream and the propagation of
* back-pressure upstream.
*/
final case class PublisherSource[In](p: Publisher[In]) extends SimpleSource[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit =
p.subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] = p
}
/**
* Start a new `Flow` from the given Iterator. The produced stream of elements
* will continue until the iterator runs empty or fails during evaluation of
* the `next()` method. Elements are pulled out of the iterator
* in accordance with the demand coming from the downstream transformation
* steps.
*/
final case class IteratorSource[In](iterator: Iterator[In]) extends SimpleSource[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit =
create(materializer, flowName).subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] =
if (iterator.isEmpty) EmptyPublisher[In]
else ActorPublisher[In](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings),
name = s"$flowName-0-iterator"))
}
/**
* Starts a new `Flow` from the given `Iterable`. This is like starting from an
* Iterator, but every Subscriber directly attached to the Publisher of this
* stream will see an individual flow of elements (always starting from the
* beginning) regardless of when they subscribed.
*/
final case class IterableSource[In](iterable: immutable.Iterable[In]) extends SimpleSource[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit =
create(materializer, flowName).subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] =
if (iterable.isEmpty) EmptyPublisher[In]
else ActorPublisher[In](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings),
name = s"$flowName-0-iterable"), Some(iterable))
}
/**
* Define the sequence of elements to be produced by the given closure.
* The stream ends normally when evaluation of the closure returns a `None`.
* The stream ends exceptionally when an exception is thrown from the closure.
*/
final case class ThunkSource[In](f: () Option[In]) extends SimpleSource[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit =
create(materializer, flowName).subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] =
ActorPublisher[In](materializer.actorOf(SimpleCallbackPublisher.props(materializer.settings, f),
name = s"$flowName-0-thunk"))
}
/**
* Start a new `Flow` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
*/
final case class FutureSource[In](future: Future[In]) extends SimpleSource[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit =
create(materializer, flowName).subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] =
future.value match {
case Some(Success(element))
ActorPublisher[In](materializer.actorOf(IterablePublisher.props(List(element), materializer.settings),
name = s"$flowName-0-future"), Some(future))
case Some(Failure(t))
ErrorPublisher(t).asInstanceOf[Publisher[In]]
case None
ActorPublisher[In](materializer.actorOf(FuturePublisher.props(future, materializer.settings),
name = s"$flowName-0-future"), Some(future))
}
}
/**
* Elements are produced from the tick closure periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements.
* If a consumer has not requested any elements at the point in time when the tick
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
final case class TickSource[In](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () In) extends SimpleSource[In] {
override def attach(flowSubscriber: Subscriber[In], materializer: ActorBasedFlowMaterializer, flowName: String): Unit =
create(materializer, flowName).subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] =
ActorPublisher[In](materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings),
name = s"$flowName-0-tick"))
}

View file

@ -31,6 +31,19 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest {
proc.expectCancellation() proc.expectCancellation()
} }
"yield the first value when actively constructing" in {
val p = StreamTestKit.PublisherProbe[Int]()
val f = FutureSink[Int]
val s = SubscriberSource[Int]
val m = FlowFrom[Int].withSource(s).withSink(f).run()
p.subscribe(s.subscriber(m))
val proc = p.expectSubscription
proc.expectRequest()
proc.sendNext(42)
Await.result(f.future(m), 100.millis) should be(42)
proc.expectCancellation()
}
"yield the first error" in { "yield the first error" in {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val f = FutureSink[Int] val f = FutureSink[Int]