diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 3a673ba882..2043656d99 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -141,33 +141,6 @@ abstract class ActorFlowMaterializer extends FlowMaterializer { } -abstract class FlowMaterializer { - - /** - * The `namePrefix` shall be used for deriving the names of processing - * entities that are created during materialization. This is meant to aid - * logging and failure reporting both during materialization and while the - * stream is running. - */ - def withNamePrefix(name: String): FlowMaterializer - - /** - * This method interprets the given Flow description and creates the running - * stream. The result can be highly implementation specific, ranging from - * local actor chains to remote-deployed processing networks. - */ - def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat - - /** - * Running a flow graph will require execution resources, as will computations - * within Sources, Sinks, etc. This [[scala.concurrent.ExecutionContextExecutor]] - * can be used by parts of the flow to submit processing jobs for execution, - * run Future callbacks, etc. - */ - def executionContext: ExecutionContextExecutor - -} - /** * This exception or subtypes thereof should be used to signal materialization * failures. @@ -261,10 +234,10 @@ final case class ActorFlowMaterializerSettings( def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): ActorFlowMaterializerSettings = { import Supervision._ copy(supervisionDecider = decider match { - case `resumingDecider` => resumingDecider - case `restartingDecider` => restartingDecider - case `stoppingDecider` => stoppingDecider - case other => other.apply _ + case `resumingDecider` ⇒ resumingDecider + case `restartingDecider` ⇒ restartingDecider + case `stoppingDecider` ⇒ stoppingDecider + case other ⇒ other.apply _ }) } diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala new file mode 100644 index 0000000000..8e9063663f --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.ExecutionContextExecutor +import akka.japi + +abstract class FlowMaterializer { + + /** + * The `namePrefix` shall be used for deriving the names of processing + * entities that are created during materialization. This is meant to aid + * logging and failure reporting both during materialization and while the + * stream is running. + */ + def withNamePrefix(name: String): FlowMaterializer + + /** + * This method interprets the given Flow description and creates the running + * stream. The result can be highly implementation specific, ranging from + * local actor chains to remote-deployed processing networks. + */ + def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat + + /** + * Running a flow graph will require execution resources, as will computations + * within Sources, Sinks, etc. This [[scala.concurrent.ExecutionContextExecutor]] + * can be used by parts of the flow to submit processing jobs for execution, + * run Future callbacks, etc. + */ + def executionContext: ExecutionContextExecutor + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index 436e3f4779..08bae18a57 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -19,6 +19,17 @@ import org.reactivestreams._ import scala.concurrent.{ Await, ExecutionContextExecutor } +object ActorFlowMaterializerImpl { + import OperationAttributes._ + private[akka] def calcSettings(opAttr: OperationAttributes)(settings: ActorFlowMaterializerSettings): ActorFlowMaterializerSettings = + opAttr.attributes.collect { + case InputBuffer(initial, max) ⇒ (s: ActorFlowMaterializerSettings) ⇒ s.withInputBuffer(initial, max) + case Dispatcher(dispatcher) ⇒ (s: ActorFlowMaterializerSettings) ⇒ s.withDispatcher(dispatcher) + case SupervisionStrategy(decider) ⇒ (s: ActorFlowMaterializerSettings) ⇒ + s.withSupervisionStrategy(decider) + }.reduceOption(_ andThen _).getOrElse((x: ActorFlowMaterializerSettings) ⇒ x)(settings) // FIXME is this the optimal way of encoding this? +} + /** * INTERNAL API */ @@ -29,6 +40,7 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize namePrefix: String, optimizations: Optimizations) extends ActorFlowMaterializer { + import ActorFlowMaterializerImpl._ import akka.stream.impl.Stages._ def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name) @@ -49,60 +61,66 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize name } - override protected def materializeAtomic(atomic: Module, effectiveAttributes: OperationAttributes): Any = atomic match { - case sink: SinkModule[_, _] ⇒ - val (sub, mat) = sink.create(ActorFlowMaterializerImpl.this, stageName(effectiveAttributes)) - assignPort(sink.shape.inlet, sub.asInstanceOf[Subscriber[Any]]) - mat - case source: SourceModule[_, _] ⇒ - val (pub, mat) = source.create(ActorFlowMaterializerImpl.this, stageName(effectiveAttributes)) - assignPort(source.shape.outlet, pub.asInstanceOf[Publisher[Any]]) - mat + override protected def materializeAtomic(atomic: Module, effectiveAttributes: OperationAttributes): Any = { - case stage: StageModule ⇒ - val (processor, mat) = processorFor(stage, effectiveAttributes) - assignPort(stage.inPort, processor) - assignPort(stage.outPort, processor) - mat + atomic match { + case sink: SinkModule[_, _] ⇒ + val (sub, mat) = sink.create(ActorFlowMaterializerImpl.this, stageName(effectiveAttributes)) + assignPort(sink.shape.inlet, sub.asInstanceOf[Subscriber[Any]]) + mat + case source: SourceModule[_, _] ⇒ + val (pub, mat) = source.create(ActorFlowMaterializerImpl.this, stageName(effectiveAttributes)) + assignPort(source.shape.outlet, pub.asInstanceOf[Publisher[Any]]) + mat - case junction: JunctionModule ⇒ materializeJunction(junction, effectiveAttributes) + case stage: StageModule ⇒ + val (processor, mat) = processorFor(stage, effectiveAttributes, calcSettings(effectiveAttributes)(settings)) + assignPort(stage.inPort, processor) + assignPort(stage.outPort, processor) + mat + case junction: JunctionModule ⇒ materializeJunction(junction, effectiveAttributes, calcSettings(effectiveAttributes)(settings)) + } } - private def processorFor(op: StageModule, effectiveAttributes: OperationAttributes): (Processor[Any, Any], Any) = op match { + private def processorFor(op: StageModule, + effectiveAttributes: OperationAttributes, + effectiveSettings: ActorFlowMaterializerSettings): (Processor[Any, Any], Any) = op match { case DirectProcessor(processorFactory, _) ⇒ processorFactory() case _ ⇒ val (opprops, mat) = ActorProcessorFactory.props(ActorFlowMaterializerImpl.this, op, effectiveAttributes) val processor = ActorProcessorFactory[Any, Any](actorOf( opprops, stageName(effectiveAttributes), - effectiveAttributes.settings(settings).dispatcher)) + effectiveSettings.dispatcher)) processor -> mat } - private def materializeJunction(op: JunctionModule, effectiveAttributes: OperationAttributes): Unit = { + private def materializeJunction(op: JunctionModule, + effectiveAttributes: OperationAttributes, + effectiveSettings: ActorFlowMaterializerSettings): Unit = { op match { case fanin: FanInModule ⇒ val (props, inputs, output) = fanin match { case MergeModule(shape, _) ⇒ - (FairMerge.props(effectiveAttributes.settings(settings), shape.inArray.size), shape.inArray.toSeq, shape.out) + (FairMerge.props(effectiveSettings, shape.inArray.size), shape.inArray.toSeq, shape.out) case f: FlexiMergeModule[t, p] ⇒ val flexi = f.flexi(f.shape) - (FlexiMerge.props(effectiveAttributes.settings(settings), f.shape, flexi), f.shape.inlets, f.shape.outlets.head) + (FlexiMerge.props(effectiveSettings, f.shape, flexi), f.shape.inlets, f.shape.outlets.head) case MergePreferredModule(shape, _) ⇒ - (UnfairMerge.props(effectiveAttributes.settings(settings), shape.inlets.size), shape.preferred +: shape.inArray.toSeq, shape.out) + (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inArray.toSeq, shape.out) case ConcatModule(shape, _) ⇒ require(shape.inArray.size == 2, "currently only supporting concatenation of exactly two inputs") // FIXME - (Concat.props(effectiveAttributes.settings(settings)), shape.inArray.toSeq, shape.out) + (Concat.props(effectiveSettings), shape.inArray.toSeq, shape.out) case zip: ZipWithModule ⇒ - (zip.props(effectiveAttributes.settings(settings)), zip.shape.inlets, zip.outPorts.head) + (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head) } - val impl = actorOf(props, stageName(effectiveAttributes), effectiveAttributes.settings(settings).dispatcher) + val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) val publisher = new ActorPublisher[Any](impl) impl ! ExposedPublisher(publisher) for ((in, id) ← inputs.zipWithIndex) { @@ -115,18 +133,18 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize case r: FlexiRouteModule[t, p] ⇒ val flexi = r.flexi(r.shape) - (FlexiRoute.props(effectiveAttributes.settings(settings), r.shape, flexi), r.shape.inlets.head: InPort, r.shape.outlets) + (FlexiRoute.props(effectiveSettings, r.shape, flexi), r.shape.inlets.head: InPort, r.shape.outlets) case BroadcastModule(shape, _) ⇒ - (Broadcast.props(effectiveAttributes.settings(settings), shape.outArray.size), shape.in, shape.outArray.toSeq) + (Broadcast.props(effectiveSettings, shape.outArray.size), shape.in, shape.outArray.toSeq) case BalanceModule(shape, waitForDownstreams, _) ⇒ - (Balance.props(effectiveAttributes.settings(settings), shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq) + (Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq) case UnzipModule(shape, _) ⇒ - (Unzip.props(effectiveAttributes.settings(settings)), shape.in, shape.outlets) + (Unzip.props(effectiveSettings), shape.in, shape.outlets) } - val impl = actorOf(props, stageName(effectiveAttributes), effectiveAttributes.settings(settings).dispatcher) + val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) val size = outs.size def factory(id: Int) = new ActorPublisher[Any](impl) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) @@ -215,25 +233,26 @@ private[akka] class StreamSupervisor(settings: ActorFlowMaterializerSettings) ex */ private[akka] object ActorProcessorFactory { import akka.stream.impl.Stages._ + import ActorFlowMaterializerImpl._ def props(materializer: ActorFlowMaterializerImpl, op: StageModule, parentAttributes: OperationAttributes): (Props, Any) = { val att = parentAttributes and op.attributes // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW // Also, otherwise the attributes will not affect the settings properly! - val settings = att.settings(materializer.settings) + val settings = calcSettings(att)(materializer.settings) op match { - case Identity(_) ⇒ (ActorInterpreter.props(settings, List(fusing.Map({ x: Any ⇒ x }, att.settings(settings).supervisionDecider))), ()) + case Identity(_) ⇒ (ActorInterpreter.props(settings, List(fusing.Map({ x: Any ⇒ x }, settings.supervisionDecider))), ()) case Fused(ops, _) ⇒ (ActorInterpreter.props(settings, ops), ()) - case Map(f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Map(f, att.settings(settings).supervisionDecider))), ()) - case Filter(p, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Filter(p, att.settings(settings).supervisionDecider))), ()) + case Map(f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Map(f, settings.supervisionDecider))), ()) + case Filter(p, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Filter(p, settings.supervisionDecider))), ()) case Drop(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Drop(n))), ()) case Take(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Take(n))), ()) - case Collect(pf, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Collect(att.settings(settings).supervisionDecider)(pf))), ()) - case Scan(z, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Scan(z, f, att.settings(settings).supervisionDecider))), ()) + case Collect(pf, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Collect(settings.supervisionDecider)(pf))), ()) + case Scan(z, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider))), ()) case Expand(s, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Expand(s, f))), ()) - case Conflate(s, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, att.settings(settings).supervisionDecider))), ()) + case Conflate(s, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider))), ()) case Buffer(n, s, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Buffer(n, s))), ()) - case MapConcat(f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapConcat(f, att.settings(settings).supervisionDecider))), ()) + case MapConcat(f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider))), ()) case MapAsync(f, _) ⇒ (MapAsyncProcessorImpl.props(settings, f), ()) case MapAsyncUnordered(f, _) ⇒ (MapAsyncUnorderedProcessorImpl.props(settings, f), ()) case Grouped(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Grouped(n))), ()) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 5bef7b043e..d8f6025b97 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -97,7 +97,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * @tparam T materialized type of given KeyedSource * @tparam U materialized type of given KeyedSink */ - def runWith[T, U](source: javadsl.Source[In, T], sink: javadsl.Sink[Out, U], materializer: ActorFlowMaterializer): akka.japi.Pair[T, U] = { + def runWith[T, U](source: javadsl.Source[In, T], sink: javadsl.Sink[Out, U], materializer: FlowMaterializer): akka.japi.Pair[T, U] = { val p = delegate.runWith(source.asScala, sink.asScala)(materializer) akka.japi.Pair(p._1.asInstanceOf[T], p._2.asInstanceOf[U]) } @@ -386,7 +386,7 @@ trait RunnableFlow[+Mat] extends Graph[ClosedShape, Mat] { /** * Run this flow and return the [[MaterializedMap]] containing the values for the [[KeyedMaterializable]] of the flow. */ - def run(materializer: ActorFlowMaterializer): Mat + def run(materializer: FlowMaterializer): Mat /** * Transform only the materialized value of this RunnableFlow, leaving all other properties as they were. */ @@ -399,5 +399,5 @@ private[akka] class RunnableFlowAdapter[Mat](runnable: scaladsl.RunnableFlow[Mat def module = runnable.module override def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): RunnableFlow[Mat2] = new RunnableFlowAdapter(runnable.mapMaterialized(f.apply _)) - override def run(materializer: ActorFlowMaterializer): Mat = runnable.run()(materializer) + override def run(materializer: FlowMaterializer): Mat = runnable.run()(materializer) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 6ca93786dd..7486f7efcd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -296,6 +296,6 @@ object FlowGraph { def sink[T](sink: Sink[T, _]): Inlet[T] = delegate.add(sink.asScala) - def run(mat: ActorFlowMaterializer): Unit = delegate.buildRunnable().run()(mat) + def run(mat: FlowMaterializer): Unit = delegate.buildRunnable().run()(mat) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index c58d068114..844522c08b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -116,7 +116,7 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[ /** * Connect this `Sink` to a `Source` and run it. */ - def runWith[M](source: javadsl.Source[In, M], materializer: ActorFlowMaterializer): M = + def runWith[M](source: javadsl.Source[In, M], materializer: FlowMaterializer): M = asScala.runWith(source.asScala)(materializer) /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 1a3c1b2cc0..ec1e1f7839 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -202,11 +202,17 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour def to[M](sink: javadsl.Sink[Out, M]): javadsl.RunnableFlow[Mat] = new RunnableFlowAdapter(delegate.to(sink.asScala)) + /** + * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. + */ + def to[M, M2](sink: javadsl.Sink[Out, M], combine: japi.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = + new RunnableFlowAdapter(delegate.toMat(sink.asScala)(combinerToScala(combine))) + /** * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * of the `Sink`, e.g. the `Publisher` of a `Sink.publisher()`. */ - def runWith[M](sink: Sink[Out, M], materializer: ActorFlowMaterializer): M = + def runWith[M](sink: Sink[Out, M], materializer: FlowMaterializer): M = delegate.runWith(sink.asScala)(materializer) /** @@ -217,7 +223,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * function evaluation when the input stream ends, or completed with `Failure` * if there is a failure is signaled in the stream. */ - def runFold[U](zero: U, f: japi.Function2[U, Out, U], materializer: ActorFlowMaterializer): Future[U] = + def runFold[U](zero: U, f: japi.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] = runWith(Sink.fold(zero, f), materializer) /** @@ -235,7 +241,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * normal end of the stream, or completed with `Failure` if there is a failure is signaled in * the stream. */ - def runForeach(f: japi.Procedure[Out], materializer: ActorFlowMaterializer): Future[Unit] = + def runForeach(f: japi.Procedure[Out], materializer: FlowMaterializer): Future[Unit] = runWith(Sink.foreach(f), materializer) // COMMON OPS // diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala index 9064379c3e..df718d0c94 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala @@ -13,7 +13,7 @@ import akka.actor.ActorSystem import akka.actor.ExtendedActorSystem import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.stream.scaladsl import akka.util.ByteString import akka.japi.Util.immutableSeq @@ -60,7 +60,7 @@ object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider { * * Convenience shortcut for: `flow.join(handler).run()`. */ - def handleWith[Mat](handler: Flow[ByteString, ByteString, Mat], materializer: ActorFlowMaterializer): Mat = + def handleWith[Mat](handler: Flow[ByteString, ByteString, Mat], materializer: FlowMaterializer): Mat = delegate.handleWith(handler.asScala)(materializer) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index fb36669700..9606b6bc5e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -132,7 +132,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a [[SubscriberSource]] and * and `Publisher` of a [[PublisherSink]]. */ - def runWith[Mat1, Mat2](source: Source[In, Mat1], sink: Sink[Out, Mat2])(implicit materializer: ActorFlowMaterializer): (Mat1, Mat2) = { + def runWith[Mat1, Mat2](source: Source[In, Mat1], sink: Sink[Out, Mat2])(implicit materializer: FlowMaterializer): (Mat1, Mat2) = { source.via(this).toMat(sink)(Keep.both).run() } @@ -188,7 +188,7 @@ case class RunnableFlow[+Mat](private[stream] val module: StreamLayout.Module) e /** * Run this flow and return the materialized instance from the flow. */ - def run()(implicit materializer: ActorFlowMaterializer): Mat = materializer.materialize(this) + def run()(implicit materializer: FlowMaterializer): Mat = materializer.materialize(this) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ImplicitFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ImplicitFlowMaterializer.scala index 07a3fcdd34..95d0113fd8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ImplicitFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ImplicitFlowMaterializer.scala @@ -9,16 +9,16 @@ import akka.stream.ActorFlowMaterializer /** * Mix this trait into your [[akka.actor.Actor]] if you need an implicit - * [[akka.stream.ActorFlowMaterializer]] in scope. + * [[akka.stream.FlowMaterializer]] in scope. * * Subclass may override [[#flowMaterializerSettings]] to define custom - * [[akka.stream.ActorFlowMaterializerSettings]] for the `ActorFlowMaterializer`. + * [[akka.stream.FlowMaterializerSettings]] for the `FlowMaterializer`. */ trait ImplicitFlowMaterializer { this: Actor ⇒ /** * Subclass may override this to define custom - * [[akka.stream.ActorFlowMaterializerSettings]] for the `ActorFlowMaterializer`. + * [[akka.stream.FlowMaterializerSettings]] for the `FlowMaterializer`. */ def flowMaterializerSettings: ActorFlowMaterializerSettings = ActorFlowMaterializerSettings(context.system) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala index ff50faf9fb..4d3f956aa8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala @@ -3,7 +3,6 @@ */ package akka.stream.scaladsl -import akka.stream.ActorFlowMaterializerSettings import akka.stream.impl.Stages.StageModule import akka.stream.Supervision @@ -11,7 +10,7 @@ import akka.stream.Supervision * Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]] * materialization. */ -final case class OperationAttributes private (private val attributes: List[OperationAttributes.Attribute] = Nil) { +final case class OperationAttributes private (attributes: List[OperationAttributes.Attribute] = Nil) { import OperationAttributes._ @@ -33,14 +32,6 @@ final case class OperationAttributes private (private val attributes: List[Opera case _ ⇒ "unknown-operation" } - private[akka] def settings: ActorFlowMaterializerSettings ⇒ ActorFlowMaterializerSettings = - attributes.collect { - case InputBuffer(initial, max) ⇒ (s: ActorFlowMaterializerSettings) ⇒ s.withInputBuffer(initial, max) - case Dispatcher(dispatcher) ⇒ (s: ActorFlowMaterializerSettings) ⇒ s.withDispatcher(dispatcher) - case SupervisionStrategy(decider) ⇒ (s: ActorFlowMaterializerSettings) ⇒ - s.withSupervisionStrategy(decider) - }.reduceOption(_ andThen _).getOrElse(identity) // FIXME is this the optimal way of encoding this? - private[akka] def transform(node: StageModule): StageModule = if ((this eq OperationAttributes.none) || (this eq node.attributes)) node else node.withAttributes(attributes = this and node.attributes) @@ -60,11 +51,11 @@ final case class OperationAttributes private (private val attributes: List[Opera object OperationAttributes { - private[OperationAttributes] trait Attribute - private[OperationAttributes] final case class Name(n: String) extends Attribute - private[OperationAttributes] final case class InputBuffer(initial: Int, max: Int) extends Attribute - private[OperationAttributes] final case class Dispatcher(dispatcher: String) extends Attribute - private[OperationAttributes] final case class SupervisionStrategy(decider: Supervision.Decider) extends Attribute + sealed trait Attribute + final case class Name(n: String) extends Attribute + final case class InputBuffer(initial: Int, max: Int) extends Attribute + final case class Dispatcher(dispatcher: String) extends Attribute + final case class SupervisionStrategy(decider: Supervision.Decider) extends Attribute private[OperationAttributes] def apply(attribute: Attribute): OperationAttributes = apply(List(attribute)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 9ccceb5369..8b24909a11 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -12,7 +12,7 @@ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.{ Promise, Future } import scala.util.{ Success, Failure, Try } -import akka.stream.ActorFlowMaterializer +import akka.stream.FlowMaterializer import akka.stream.impl.StreamLayout.Module /** @@ -28,7 +28,7 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) * Connect this `Sink` to a `Source` and run it. The returned value is the materialized value * of the `Source`, e.g. the `Subscriber` of a [[SubscriberSource]]. */ - def runWith[Mat2](source: Source[In, Mat2])(implicit materializer: ActorFlowMaterializer): Mat2 = + def runWith[Mat2](source: Source[In, Mat2])(implicit materializer: FlowMaterializer): Mat2 = source.to(this).run() def mapMaterialized[Mat2](f: Mat ⇒ Mat2): Sink[In, Mat2] = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 31847c3433..d0e30000df 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -16,7 +16,7 @@ import org.reactivestreams.Publisher import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ ExecutionContext, Future } -import akka.stream.{ ActorFlowMaterializer, Graph } +import akka.stream.{ FlowMaterializer, Graph } import akka.stream.impl._ import akka.actor.Cancellable import akka.actor.ActorRef @@ -96,7 +96,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]]. */ - def runWith[Mat2](sink: Sink[Out, Mat2])(implicit materializer: ActorFlowMaterializer): Mat2 = toMat(sink)(Keep.right).run() + def runWith[Mat2](sink: Sink[Out, Mat2])(implicit materializer: FlowMaterializer): Mat2 = toMat(sink)(Keep.right).run() /** * Shortcut for running this `Source` with a fold function. @@ -106,7 +106,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) * function evaluation when the input stream ends, or completed with `Failure` * if there is a failure signaled in the stream. */ - def runFold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: ActorFlowMaterializer): Future[U] = + def runFold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: FlowMaterializer): Future[U] = runWith(Sink.fold(zero)(f)) /** @@ -116,7 +116,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) * normal end of the stream, or completed with `Failure` if there is a failure signaled in * the stream. */ - def runForeach(f: Out ⇒ Unit)(implicit materializer: ActorFlowMaterializer): Future[Unit] = runWith(Sink.foreach(f)) + def runForeach(f: Out ⇒ Unit)(implicit materializer: FlowMaterializer): Future[Unit] = runWith(Sink.foreach(f)) /** * Concatenates a second source so that the first element diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala index 8c8dee3774..a65cf9297b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala @@ -53,7 +53,7 @@ object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider { * * Convenience shortcut for: `flow.join(handler).run()`. */ - def handleWith[Mat](handler: Flow[ByteString, ByteString, Mat])(implicit materializer: ActorFlowMaterializer): Mat = + def handleWith[Mat](handler: Flow[ByteString, ByteString, Mat])(implicit materializer: FlowMaterializer): Mat = flow.joinMat(handler)(Keep.right).run() } @@ -133,7 +133,7 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { endpoint: InetSocketAddress, backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil, - idleTimeout: Duration = Duration.Inf)(implicit m: ActorFlowMaterializer): Future[ServerBinding] = { + idleTimeout: Duration = Duration.Inf)(implicit m: FlowMaterializer): Future[ServerBinding] = { bind(endpoint, backlog, options, idleTimeout).to(Sink.foreach { conn: IncomingConnection ⇒ conn.flow.join(handler).run() }).run() diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/package.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/package.scala index 04c2c9a9fd..1a8fdc9114 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/package.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/package.scala @@ -43,12 +43,12 @@ package akka.stream * * By default every operation is executed within its own [[akka.actor.Actor]] * to enable full pipelining of the chained set of computations. This behavior - * is determined by the [[akka.stream.ActorFlowMaterializer]] which is required + * is determined by the [[akka.stream.FlowMaterializer]] which is required * by those methods that materialize the Flow into a series of * [[org.reactivestreams.Processor]] instances. The returned reactive stream * is fully started and active. * - * Use [[ImplicitFlowMaterializer]] to define an implicit [[akka.stream.ActorFlowMaterializer]] + * Use [[ImplicitFlowMaterializer]] to define an implicit [[akka.stream.FlowMaterializer]] * inside an [[akka.actor.Actor]]. */ package object scaladsl {