diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala index 9e90944a1a..32867ecd92 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala @@ -88,7 +88,7 @@ private[http] object HttpServerBluePrint { (requestParsing, renderer) ⇒ import FlowGraph.Implicits._ - val bypassFanout = b.add(Broadcast[RequestOutput](2, OperationAttributes.name("bypassFanout"))) + val bypassFanout = b.add(Broadcast[RequestOutput](2).named("bypassFanout")) val bypassMerge = b.add(new BypassMerge(settings, log)) val bypassInput = bypassMerge.in0 val bypassOneHundredContinueInput = bypassMerge.in1 diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala index ea3a89d214..715868b497 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala @@ -19,7 +19,7 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] { processorFromFlow( // withAttributes "wraps" the underlying identity and protects it from automatic removal - Flow[Int].andThen(Identity()).withAttributes(OperationAttributes.name("identity"))) + Flow[Int].andThen(Identity()).named("identity")) } override def createElement(element: Int): Int = element diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala index 1a3d606897..b559c12dd1 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala @@ -14,7 +14,7 @@ class MapTest extends AkkaIdentityProcessorVerification[Int] { implicit val materializer = ActorFlowMaterializer()(system) processorFromFlow( - Flow[Int].map(elem ⇒ elem).withAttributes(OperationAttributes.name("identity"))) + Flow[Int].map(elem ⇒ elem).named("identity")) } override def createElement(element: Int): Int = element diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ChainSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ChainSetup.scala index 89506b1c28..09ebae5dab 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ChainSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ChainSetup.scala @@ -24,7 +24,7 @@ class ChainSetup[In, Out]( val upstream = StreamTestKit.PublisherProbe[In]() val downstream = StreamTestKit.SubscriberProbe[Out]() - private val s = Source(upstream).via(stream(Flow[In].map(x ⇒ x).withAttributes(OperationAttributes.name("buh")))) + private val s = Source(upstream).via(stream(Flow[In].map(x ⇒ x).named("buh"))) val publisher = toPublisher(s, materializer) val upstreamSubscription = upstream.expectSubscription() publisher.subscribe(downstream) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala index 2a28bae34e..b34ff672a1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala @@ -32,7 +32,7 @@ class GraphJunctionAttributesSpec extends AkkaSpec { val slow = Source(0.seconds, 100.millis, SlowTick) val fast = Source(0.seconds, 10.millis, FastTick) - val zip = b add Zip[SlowTick, List[FastTick]](inputBuffer(1, 1)) + val zip = b add Zip[SlowTick, List[FastTick]]().withAttributes(inputBuffer(1, 1)) slow ~> zip.in0 fast.conflate(tick ⇒ List(tick)) { case (list, tick) ⇒ tick :: list } ~> zip.in1 diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template index 8857f19abf..04849804fa 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template @@ -3,8 +3,9 @@ */ package akka.stream.scaladsl +import akka.stream.impl.StreamLayout import akka.stream.impl.StreamLayout.Module -import akka.stream.{ Graph, Shape } +import akka.stream.{ Graph, OperationAttributes, Shape } trait GraphApply { @@ -29,10 +30,7 @@ trait GraphApply { val s = buildBlock(builder) val mod = builder.module.wrap().replaceShape(s) - new Graph[S, Unit] { - override def shape: S = s - override private[stream] def module: Module = mod - } + new GraphApply.GraphImpl(s, mod) } /** @@ -60,10 +58,7 @@ trait GraphApply { val s = buildBlock(builder)(s1) val mod = builder.module.wrap().replaceShape(s) - new Graph[S, Mat] { - override def shape: S = s - override private[stream] def module: Module = mod - } + new GraphApply.GraphImpl(s, mod) } @@ -98,13 +93,25 @@ trait GraphApply { val s = buildBlock(builder)([#s1#]) val mod = builder.module.wrap().replaceShape(s) - new Graph[S, Mat] { - override def shape: S = s - override private[stream] def module: Module = mod - } + new GraphApply.GraphImpl(s, mod) }# ] } + +/** + * INTERNAL API + */ +private[stream] object GraphApply { + class GraphImpl[S <: Shape, Mat](override val shape: S, private[stream] override val module: StreamLayout.Module) + extends Graph[S, Mat] { + + override def withAttributes(attr: OperationAttributes): Graph[S, Mat] = + new GraphImpl(shape, module.withAttributes(attr).wrap()) + + override def named(name: String): Graph[S, Mat] = withAttributes(OperationAttributes.name(name)) + } + +} diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template index 547a5c2ff5..aec5c994b3 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template @@ -3,8 +3,9 @@ */ package akka.stream.scaladsl -import akka.stream.impl.GenJunctions._ import akka.stream._ +import akka.stream.impl.GenJunctions._ +import akka.stream.impl.StreamLayout trait ZipWithApply { @@ -14,13 +15,25 @@ trait ZipWithApply { * @param f zipping-function from the input values to the output value * @param attributes optional attributes for this vertex */ - def apply[[#A1#], O](zipper: ([#A1#]) ⇒ O): Graph[FanInShape1[[#A1#], O], Unit] = - new Graph[FanInShape1[[#A1#], O], Unit] { - val shape = new FanInShape1[[#A1#], O]("ZipWith1") - val module = new ZipWith1Module(shape, zipper, OperationAttributes.name("ZipWith1")) + def apply[[#A1#], O](zipper: ([#A1#]) ⇒ O): ZipWith1[[#A1#], O] = { + val shape = new FanInShape1[[#A1#], O]("ZipWith1") + new ZipWith1(shape, new ZipWith1Module(shape, zipper, OperationAttributes.name("ZipWith1"))) } # ] } + +[2..20#/** `ZipWith` specialized for 1 inputs */ +class ZipWith1[[#A1#], O] private[stream] (override val shape: FanInShape1[[#A1#], O], + private[stream] override val module: StreamLayout.Module) + extends Graph[FanInShape1[[#A1#], O], Unit] { + + override def withAttributes(attr: OperationAttributes): ZipWith1[[#A1#], O] = + new ZipWith1(shape, module.withAttributes(attr).wrap()) + + override def named(name: String): ZipWith1[[#A1#], O] = withAttributes(OperationAttributes.name(name)) +} +# +] diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 1abbf56624..bd39bef27c 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -235,7 +235,7 @@ final case class ActorFlowMaterializerSettings( /** * Scala API: Decides how exceptions from application code are to be handled, unless * overridden for specific flows of the stream operations with - * [[akka.stream.scaladsl.OperationAttributes#supervisionStrategy]]. + * [[akka.stream.OperationAttributes#supervisionStrategy]]. */ def withSupervisionStrategy(decider: Supervision.Decider): ActorFlowMaterializerSettings = copy(supervisionDecider = decider) @@ -243,7 +243,7 @@ final case class ActorFlowMaterializerSettings( /** * Java API: Decides how exceptions from application code are to be handled, unless * overridden for specific flows of the stream operations with - * [[akka.stream.javadsl.OperationAttributes#supervisionStrategy]]. + * [[akka.stream.OperationAttributes#supervisionStrategy]]. */ def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): ActorFlowMaterializerSettings = { import Supervision._ diff --git a/akka-stream/src/main/scala/akka/stream/Graph.scala b/akka-stream/src/main/scala/akka/stream/Graph.scala index edab6d8b41..5cc15cb93d 100644 --- a/akka-stream/src/main/scala/akka/stream/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/Graph.scala @@ -21,4 +21,8 @@ trait Graph[+S <: Shape, +M] { * Every materializable element must be backed by a stream layout module */ private[stream] def module: StreamLayout.Module + + def withAttributes(attr: OperationAttributes): Graph[S, M] + + def named(name: String): Graph[S, M] = withAttributes(OperationAttributes.name(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala b/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala index fa38dca65f..19eac25f08 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala @@ -63,7 +63,7 @@ private[stream] object Junctions { final case class FlexiMergeModule[T, S <: Shape]( shape: S, flexi: S ⇒ MergeLogic[T], - override val attributes: OperationAttributes = name("flexiMerge")) extends FanInModule { + override val attributes: OperationAttributes) extends FanInModule { require(shape.outlets.size == 1, "FlexiMerge can have only one output port") @@ -75,7 +75,7 @@ private[stream] object Junctions { final case class FlexiRouteModule[T, S <: Shape]( shape: S, flexi: S ⇒ RouteLogic[T], - override val attributes: OperationAttributes = name("flexiRoute")) extends FanOutModule { + override val attributes: OperationAttributes) extends FanOutModule { require(shape.inlets.size == 1, "FlexiRoute can have only one input port") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala index 9316ebd113..84ea594fed 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala @@ -6,6 +6,7 @@ package akka.stream.javadsl import akka.stream.scaladsl import akka.stream.Graph import akka.stream.BidiShape +import akka.stream.OperationAttributes object BidiFlow { @@ -122,4 +123,7 @@ class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O1, I2, * Turn this BidiFlow around by 180 degrees, logically flipping it upside down in a protocol stack. */ def reversed: BidiFlow[I2, O2, I1, O1, Mat] = new BidiFlow(delegate.reversed) + + override def withAttributes(attr: OperationAttributes): BidiFlow[I1, O1, I2, O2, Mat] = + new BidiFlow(delegate.withAttributes(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala index bf0cc80ecd..145003e6b5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala @@ -12,6 +12,7 @@ import akka.japi.Util.immutableIndexedSeq import akka.stream._ import akka.stream.impl.StreamLayout import akka.stream.impl.Junctions.FlexiMergeModule +import akka.stream.impl.Stages.DefaultAttributes object FlexiMerge { @@ -314,7 +315,12 @@ object FlexiMerge { abstract class FlexiMerge[T, Out, S <: Shape](val shape: S, val attributes: OperationAttributes) extends Graph[S, Unit] { import FlexiMerge._ - val module: StreamLayout.Module = new FlexiMergeModule(shape, (s: S) ⇒ new Internal.MergeLogicWrapper(createMergeLogic(s))) + /** + * INTERNAL API + */ + private[stream] val module: StreamLayout.Module = + new FlexiMergeModule(shape, (s: S) ⇒ new Internal.MergeLogicWrapper(createMergeLogic(s)), + attributes and DefaultAttributes.flexiMerge) def createMergeLogic(s: S): MergeLogic[T, Out] @@ -322,4 +328,10 @@ abstract class FlexiMerge[T, Out, S <: Shape](val shape: S, val attributes: Oper case Some(n) ⇒ n case None ⇒ super.toString } + + override def withAttributes(attr: OperationAttributes): Graph[S, Unit] = + throw new UnsupportedOperationException( + "withAttributes not supported by default by FlexiMerge, subclass may override and implement it") + + override def named(name: String): Graph[S, Unit] = withAttributes(OperationAttributes.name(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala index 0ec9ad0093..cf70700377 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala @@ -11,6 +11,7 @@ import akka.japi.Util.immutableIndexedSeq import akka.stream._ import akka.stream.impl.StreamLayout import akka.stream.impl.Junctions.FlexiRouteModule +import akka.stream.impl.Stages.DefaultAttributes object FlexiRoute { @@ -277,7 +278,12 @@ object FlexiRoute { abstract class FlexiRoute[In, S <: Shape](val shape: S, val attributes: OperationAttributes) extends Graph[S, Unit] { import FlexiRoute._ - val module: StreamLayout.Module = new FlexiRouteModule(shape, (s: S) ⇒ new Internal.RouteLogicWrapper(createRouteLogic(s))) + /** + * INTERNAL API + */ + private[stream] val module: StreamLayout.Module = + new FlexiRouteModule(shape, (s: S) ⇒ new Internal.RouteLogicWrapper(createRouteLogic(s)), + attributes and DefaultAttributes.flexiRoute) /** * Create the stateful logic that will be used when reading input elements @@ -290,4 +296,10 @@ abstract class FlexiRoute[In, S <: Shape](val shape: S, val attributes: Operatio case None ⇒ super.toString } + override def withAttributes(attr: OperationAttributes): Graph[S, Unit] = + throw new UnsupportedOperationException( + "withAttributes not supported by default by FlexiRoute, subclass may override and implement it") + + override def named(name: String): Graph[S, Unit] = withAttributes(OperationAttributes.name(name)) + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 596691fd7c..f1f173a93c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -408,10 +408,10 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph def concat[M](second: javadsl.Source[Out @uncheckedVariance, M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] = new Flow(delegate.concat(second.asScala).mapMaterialized(p ⇒ Pair(p._1, p._2))) - def withAttributes(attr: OperationAttributes): javadsl.Flow[In, Out, Mat] = + override def withAttributes(attr: OperationAttributes): javadsl.Flow[In, Out, Mat] = new Flow(delegate.withAttributes(attr)) - def named(name: String): javadsl.Flow[In, Out, Mat] = + override def named(name: String): javadsl.Flow[In, Out, Mat] = new Flow(delegate.named(name)) } @@ -438,4 +438,10 @@ private[akka] class RunnableFlowAdapter[Mat](runnable: scaladsl.RunnableFlow[Mat override def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): RunnableFlow[Mat2] = new RunnableFlowAdapter(runnable.mapMaterialized(f.apply _)) override def run(materializer: FlowMaterializer): Mat = runnable.run()(materializer) + + override def withAttributes(attr: OperationAttributes): RunnableFlow[Mat] = + new RunnableFlowAdapter(runnable.withAttributes(attr)) + + override def named(name: String): RunnableFlow[Mat] = + new RunnableFlowAdapter(runnable.named(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 2f911b9b82..0ecf25315b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -20,32 +20,17 @@ import akka.japi.Pair */ object Merge { - /** - * Create a new `Merge` vertex with the specified output type and attributes. - * - * @param attributes optional attributes for this vertex - */ - def create[T](outputCount: Int, attributes: OperationAttributes): Graph[UniformFanInShape[T, T], Unit] = - scaladsl.Merge(outputCount, attributes) - /** * Create a new `Merge` vertex with the specified output type. */ - def create[T](outputCount: Int): Graph[UniformFanInShape[T, T], Unit] = create(outputCount, OperationAttributes.none) + def create[T](outputCount: Int): Graph[UniformFanInShape[T, T], Unit] = + scaladsl.Merge(outputCount) /** * Create a new `Merge` vertex with the specified output type. */ def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanInShape[T, T], Unit] = create(outputCount) - /** - * Create a new `Merge` vertex with the specified output type and attributes. - * - * @param attributes optional attributes for this vertex - */ - def create[T](clazz: Class[T], outputCount: Int, attributes: OperationAttributes): Graph[UniformFanInShape[T, T], Unit] = - create(outputCount, attributes) - } /** @@ -61,32 +46,17 @@ object Merge { * instances. */ object MergePreferred { - /** - * Create a new `MergePreferred` vertex with the specified output type and attributes. - * - * @param attributes optional attributes for this vertex - */ - def create[T](outputCount: Int, attributes: OperationAttributes): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = - scaladsl.MergePreferred(outputCount, attributes) - /** * Create a new `MergePreferred` vertex with the specified output type. */ - def create[T](outputCount: Int): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = create(outputCount, OperationAttributes.none) + def create[T](outputCount: Int): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = + scaladsl.MergePreferred(outputCount) /** * Create a new `MergePreferred` vertex with the specified output type. */ def create[T](clazz: Class[T], outputCount: Int): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = create(outputCount) - /** - * Create a new `MergePreferred` vertex with the specified output type and attributes. - * - * @param attributes optional attributes for this vertex - */ - def create[T](clazz: Class[T], outputCount: Int, attributes: OperationAttributes): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = - create(outputCount, attributes) - } /** @@ -100,31 +70,17 @@ object MergePreferred { * instances. */ object Broadcast { - /** - * Create a new `Broadcast` vertex with the specified input type and attributes. - * - * @param attributes optional attributes for this vertex - */ - def create[T](outputCount: Int, attributes: OperationAttributes): Graph[UniformFanOutShape[T, T], Unit] = - scaladsl.Broadcast(outputCount, attributes) - /** * Create a new `Broadcast` vertex with the specified input type. */ - def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount, OperationAttributes.none) + def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = + scaladsl.Broadcast(outputCount) /** * Create a new `Broadcast` vertex with the specified input type. */ def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount) - /** - * Create a new `Broadcast` vertex with the specified input type and attributes. - * - * @param attributes optional attributes for this vertex - */ - def create[T](clazz: Class[T], outputCount: Int, attributes: OperationAttributes): Graph[UniformFanOutShape[T, T], Unit] = - create(outputCount, attributes) } /** @@ -139,24 +95,18 @@ object Broadcast { */ object Balance { /** - * Create a new `Balance` vertex with the specified input type and attributes. + * Create a new `Balance` vertex with the specified input type. * * @param waitForAllDownstreams if `true` it will not start emitting * elements to downstream outputs until all of them have requested at least one element - * @param attributes optional attributes for this vertex */ - def create[T](outputCount: Int, waitForAllDownstreams: Boolean, attributes: OperationAttributes): Graph[UniformFanOutShape[T, T], Unit] = - scaladsl.Balance(outputCount, waitForAllDownstreams, attributes) + def create[T](outputCount: Int, waitForAllDownstreams: Boolean): Graph[UniformFanOutShape[T, T], Unit] = + scaladsl.Balance(outputCount, waitForAllDownstreams) /** * Create a new `Balance` vertex with the specified input type. */ - def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount, false, OperationAttributes.none) - - /** - * Create a new `Balance` vertex with the specified input type. - */ - def create[T](outputCount: Int, attributes: OperationAttributes): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount, false, attributes) + def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount, false) /** * Create a new `Balance` vertex with the specified input type. @@ -164,12 +114,13 @@ object Balance { def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount) /** - * Create a new `Balance` vertex with the specified input type and attributes. + * Create a new `Balance` vertex with the specified input type. * - * @param attributes optional attributes for this vertex + * @param waitForAllDownstreams if `true` it will not start emitting + * elements to downstream outputs until all of them have requested at least one element */ - def create[T](clazz: Class[T], outputCount: Int, attributes: OperationAttributes): Graph[UniformFanOutShape[T, T], Unit] = - create(outputCount, false, attributes) + def create[T](clazz: Class[T], outputCount: Int, waitForAllDownstreams: Boolean): Graph[UniformFanOutShape[T, T], Unit] = + create(outputCount, waitForAllDownstreams) } object Zip { @@ -196,36 +147,21 @@ object Zip { object Unzip { /** - * Creates a new `Unzip` vertex with the specified output types and attributes. - * - * @param attributes attributes for this vertex + * Creates a new `Unzip` vertex with the specified output types. */ - def create[A, B](attributes: OperationAttributes): Graph[FanOutShape2[A Pair B, A, B], Unit] = + def create[A, B](): Graph[FanOutShape2[A Pair B, A, B], Unit] = scaladsl.FlowGraph.partial() { implicit b ⇒ - val unzip = b.add(scaladsl.Unzip[A, B](attributes)) + val unzip = b.add(scaladsl.Unzip[A, B]()) val tuple = b.add(scaladsl.Flow[A Pair B].map(p ⇒ (p.first, p.second))) b.addEdge(tuple.outlet, unzip.in) new FanOutShape2(FanOutShape.Ports(tuple.inlet, unzip.out0 :: unzip.out1 :: Nil)) } - /** - * Creates a new `Unzip` vertex with the specified output types and attributes. - */ - def create[A, B](): Graph[FanOutShape2[A Pair B, A, B], Unit] = create(OperationAttributes.none) - /** * Creates a new `Unzip` vertex with the specified output types. */ def create[A, B](left: Class[A], right: Class[B]): Graph[FanOutShape2[A Pair B, A, B], Unit] = create[A, B]() - /** - * Creates a new `Unzip` vertex with the specified output types and attributes. - * - * @param attributes optional attributes for this vertex - */ - def create[A, B](left: Class[A], right: Class[B], attributes: OperationAttributes): Graph[FanOutShape2[A Pair B, A, B], Unit] = - create[A, B](attributes) - } /** @@ -245,7 +181,7 @@ object Concat { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`. */ - def create[T](): Graph[UniformFanInShape[T, T], Unit] = create(OperationAttributes.none) + def create[T](): Graph[UniformFanInShape[T, T], Unit] = scaladsl.Concat[T]() /** * Create a new anonymous `Concat` vertex with the specified input types. @@ -253,15 +189,7 @@ object Concat { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`. */ - def create[T](attributes: OperationAttributes): Graph[UniformFanInShape[T, T], Unit] = scaladsl.Concat[T](attributes) - - /** - * Create a new anonymous `Concat` vertex with the specified input types. - * Note that a `Concat` instance can only be used at one place (one vertex) - * in the `FlowGraph`. This method creates a new instance every time it - * is called and those instances are not `equal`. - */ - def create[T](clazz: Class[T], attributes: OperationAttributes): Graph[UniformFanInShape[T, T], Unit] = create(attributes) + def create[T](clazz: Class[T]): Graph[UniformFanInShape[T, T], Unit] = create() } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 4939aacbe4..dac8ec73c6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -148,9 +148,9 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[ def mapMaterialized[Mat2](f: japi.Function[Mat, Mat2]): Sink[In, Mat2] = new Sink(delegate.mapMaterialized(f.apply _)) - def withAttributes(attr: OperationAttributes): javadsl.Sink[In, Mat] = + override def withAttributes(attr: OperationAttributes): javadsl.Sink[In, Mat] = new Sink(delegate.withAttributes(attr)) - def named(name: String): javadsl.Sink[In, Mat] = + override def named(name: String): javadsl.Sink[In, Mat] = new Sink(delegate.named(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 3caa57cc82..c53d389648 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -498,10 +498,10 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Source[U, Mat] = new Source(delegate.flatten(strategy)) - def withAttributes(attr: OperationAttributes): javadsl.Source[Out, Mat] = + override def withAttributes(attr: OperationAttributes): javadsl.Source[Out, Mat] = new Source(delegate.withAttributes(attr)) - def named(name: String): javadsl.Source[Out, Mat] = + override def named(name: String): javadsl.Source[Out, Mat] = new Source(delegate.named(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index 56e7ab6156..c02effab40 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -7,6 +7,7 @@ import akka.stream.Graph import akka.stream.BidiShape import akka.stream.impl.StreamLayout.Module import akka.stream.FlowShape +import akka.stream.OperationAttributes final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val module: Module) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { override val shape = module.shape.asInstanceOf[BidiShape[I1, O1, I2, O2]] @@ -119,6 +120,12 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu val outs = shape.outlets new BidiFlow(module.replaceShape(shape.copyFromPorts(ins.reverse, outs.reverse))) } + + override def withAttributes(attr: OperationAttributes): BidiFlow[I1, O1, I2, O2, Mat] = + new BidiFlow(module.withAttributes(attr).wrap()) + + override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] = + withAttributes(OperationAttributes.name(name)) } object BidiFlow extends BidiFlowApply { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala index ca265a0b80..cb62cce6fa 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala @@ -9,6 +9,7 @@ import scala.collection.immutable import scala.collection.immutable.Seq import akka.stream.impl.StreamLayout import akka.stream.impl.Junctions.FlexiMergeModule +import akka.stream.impl.Stages.DefaultAttributes object FlexiMerge { @@ -223,7 +224,11 @@ object FlexiMerge { * @param attributes optional attributes for this junction */ abstract class FlexiMerge[Out, S <: Shape](val shape: S, attributes: OperationAttributes) extends Graph[S, Unit] { - val module: StreamLayout.Module = new FlexiMergeModule(shape, createMergeLogic) + /** + * INTERNAL API + */ + private[stream] val module: StreamLayout.Module = + new FlexiMergeModule(shape, createMergeLogic, attributes and DefaultAttributes.flexiMerge) type PortT = S @@ -233,4 +238,10 @@ abstract class FlexiMerge[Out, S <: Shape](val shape: S, attributes: OperationAt case Some(n) ⇒ n case None ⇒ super.toString } + + override def withAttributes(attr: OperationAttributes): Graph[S, Unit] = + throw new UnsupportedOperationException( + "withAttributes not supported by default by FlexiMerge, subclass may override and implement it") + + override def named(name: String): Graph[S, Unit] = withAttributes(OperationAttributes.name(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala index 70eb2cb19d..e16ac52926 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala @@ -7,6 +7,7 @@ import akka.stream.impl.StreamLayout import akka.stream.{ Outlet, Shape, OutPort, Graph, OperationAttributes } import scala.collection.immutable import akka.stream.impl.Junctions.FlexiRouteModule +import akka.stream.impl.Stages.DefaultAttributes object FlexiRoute { @@ -192,7 +193,11 @@ object FlexiRoute { abstract class FlexiRoute[In, S <: Shape](val shape: S, attributes: OperationAttributes) extends Graph[S, Unit] { import akka.stream.scaladsl.FlexiRoute._ - val module: StreamLayout.Module = new FlexiRouteModule(shape, createRouteLogic) + /** + * INTERNAL API + */ + private[stream] val module: StreamLayout.Module = + new FlexiRouteModule(shape, createRouteLogic, attributes and DefaultAttributes.flexiRoute) /** * This allows a type-safe mini-DSL for selecting one of several ports, very useful in @@ -234,4 +239,11 @@ abstract class FlexiRoute[In, S <: Shape](val shape: S, attributes: OperationAtt case Some(n) ⇒ n case None ⇒ super.toString } + + // FIXME what to do about this? + override def withAttributes(attr: OperationAttributes): Graph[S, Unit] = + throw new UnsupportedOperationException( + "withAttributes not supported by default by FlexiRoute, subclass may override and implement it") + + override def named(name: String): Graph[S, Unit] = withAttributes(OperationAttributes.name(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 640f92b702..5857dc1420 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -268,6 +268,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) else new Flow(module.withAttributes(attr).wrap()) } + override def named(name: String): Repr[Out, Mat] = withAttributes(OperationAttributes.name(name)) + /** * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. The returned tuple contains * the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a of a [[Source#subscriber]] and @@ -317,6 +319,12 @@ case class RunnableFlow[+Mat](private[stream] val module: StreamLayout.Module) e * Run this flow and return the materialized instance from the flow. */ def run()(implicit materializer: FlowMaterializer): Mat = materializer.materialize(this) + + override def withAttributes(attr: OperationAttributes): RunnableFlow[Mat] = + new RunnableFlow(module.withAttributes(attr).wrap) + + override def named(name: String): RunnableFlow[Mat] = withAttributes(OperationAttributes.name(name)) + } /** @@ -659,8 +667,6 @@ trait FlowOps[+Out, +Mat] { def withAttributes(attr: OperationAttributes): Repr[Out, Mat] - def named(name: String): Repr[Out, Mat] = withAttributes(OperationAttributes.name(name)) - /** INTERNAL API */ private[scaladsl] def andThen[U](op: StageModule): Repr[U, Mat] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index e14cc9086a..f03c8ace5d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -14,32 +14,36 @@ import scala.collection.immutable import scala.annotation.unchecked.uncheckedVariance import scala.annotation.tailrec +object Merge { + /** + * Create a new `Merge` with the specified number of input ports. + * + * @param inputPorts number of input ports + */ + def apply[T](inputPorts: Int): Merge[T] = { + val shape = new UniformFanInShape[T, T](inputPorts) + new Merge(inputPorts, shape, new MergeModule(shape, OperationAttributes.name("Merge"))) + } + +} + /** * Merge several streams, taking elements as they arrive from input streams * (picking randomly when several have elements ready). * * A `Merge` has one `out` port and one or more `in` ports. */ -object Merge { - /** - * Create a new `Merge` with the specified number of input ports and attributes. - * - * @param inputPorts number of input ports - * @param attributes optional attributes - */ - def apply[T](inputPorts: Int, attributes: OperationAttributes = OperationAttributes.none): Graph[UniformFanInShape[T, T], Unit] = - new Graph[UniformFanInShape[T, T], Unit] { - val shape = new UniformFanInShape[T, T](inputPorts) - val module = new MergeModule(shape, OperationAttributes.name("Merge") and attributes) - } +class Merge[T] private (inputPorts: Int, + override val shape: UniformFanInShape[T, T], + private[stream] override val module: StreamLayout.Module) + extends Graph[UniformFanInShape[T, T], Unit] { + + override def withAttributes(attr: OperationAttributes): Merge[T] = + new Merge(inputPorts, shape, module.withAttributes(attr).wrap()) + + override def named(name: String): Merge[T] = withAttributes(OperationAttributes.name(name)) } -/** - * Merge several streams, taking elements as they arrive from input streams - * (picking from preferred when several have elements ready). - * - * A `MergePreferred` has one `out` port, one `preferred` input port and 0 or more secondary `in` ports. - */ object MergePreferred { import FanInShape._ final class MergePreferredShape[T](val secondaryPorts: Int, _init: Init[T]) extends UniformFanInShape[T, T](secondaryPorts, _init) { @@ -51,16 +55,43 @@ object MergePreferred { } /** - * Create a new `PreferredMerge` with the specified number of secondary input ports and attributes. + * Create a new `MergePreferred` with the specified number of secondary input ports. * * @param secondaryPorts number of secondary input ports - * @param attributes optional attributes */ - def apply[T](secondaryPorts: Int, attributes: OperationAttributes = OperationAttributes.none): Graph[MergePreferredShape[T], Unit] = - new Graph[MergePreferredShape[T], Unit] { - val shape = new MergePreferredShape[T](secondaryPorts, "MergePreferred") - val module = new MergePreferredModule(shape, OperationAttributes.name("MergePreferred") and attributes) - } + def apply[T](secondaryPorts: Int): MergePreferred[T] = { + val shape = new MergePreferredShape[T](secondaryPorts, "MergePreferred") + new MergePreferred(secondaryPorts, shape, new MergePreferredModule(shape, OperationAttributes.name("MergePreferred"))) + } +} + +/** + * Merge several streams, taking elements as they arrive from input streams + * (picking from preferred when several have elements ready). + * + * A `MergePreferred` has one `out` port, one `preferred` input port and 0 or more secondary `in` ports. + */ +class MergePreferred[T] private (secondaryPorts: Int, + override val shape: MergePreferred.MergePreferredShape[T], + private[stream] override val module: StreamLayout.Module) + extends Graph[MergePreferred.MergePreferredShape[T], Unit] { + + override def withAttributes(attr: OperationAttributes): MergePreferred[T] = + new MergePreferred(secondaryPorts, shape, module.withAttributes(attr).wrap()) + + override def named(name: String): MergePreferred[T] = withAttributes(OperationAttributes.name(name)) +} + +object Broadcast { + /** + * Create a new `Broadcast` with the specified number of output ports. + * + * @param outputPorts number of output ports + */ + def apply[T](outputPorts: Int): Broadcast[T] = { + val shape = new UniformFanOutShape[T, T](outputPorts) + new Broadcast(outputPorts, shape, new BroadcastModule(shape, OperationAttributes.name("Broadcast"))) + } } /** @@ -70,18 +101,31 @@ object MergePreferred { * * A `Broadcast` has one `in` port and 2 or more `out` ports. */ -object Broadcast { +class Broadcast[T] private (outputPorts: Int, + override val shape: UniformFanOutShape[T, T], + private[stream] override val module: StreamLayout.Module) + extends Graph[UniformFanOutShape[T, T], Unit] { + + override def withAttributes(attr: OperationAttributes): Broadcast[T] = + new Broadcast(outputPorts, shape, module.withAttributes(attr).wrap()) + + override def named(name: String): Broadcast[T] = withAttributes(OperationAttributes.name(name)) +} + +object Balance { /** - * Create a new `Broadcast` with the specified number of output ports and attributes. + * Create a new `Balance` with the specified number of output ports. * * @param outputPorts number of output ports - * @param attributes optional attributes + * @param waitForAllDownstreams if you use `waitForAllDownstreams = true` it will not start emitting + * elements to downstream outputs until all of them have requested at least one element, + * default value is `false` */ - def apply[T](outputPorts: Int, attributes: OperationAttributes = OperationAttributes.none): Graph[UniformFanOutShape[T, T], Unit] = - new Graph[UniformFanOutShape[T, T], Unit] { - val shape = new UniformFanOutShape[T, T](outputPorts) - val module = new BroadcastModule(shape, OperationAttributes.name("Broadcast") and attributes) - } + def apply[T](outputPorts: Int, waitForAllDownstreams: Boolean = false): Balance[T] = { + val shape = new UniformFanOutShape[T, T](outputPorts) + new Balance(outputPorts, waitForAllDownstreams, shape, + new BalanceModule(shape, waitForAllDownstreams, OperationAttributes.name("Balance"))) + } } /** @@ -91,21 +135,26 @@ object Broadcast { * * A `Balance` has one `in` port and 2 or more `out` ports. */ -object Balance { +class Balance[T] private (outputPorts: Int, + waitForAllDownstreams: Boolean, + override val shape: UniformFanOutShape[T, T], + private[stream] override val module: StreamLayout.Module) + extends Graph[UniformFanOutShape[T, T], Unit] { + + override def withAttributes(attr: OperationAttributes): Balance[T] = + new Balance(outputPorts, waitForAllDownstreams, shape, module.withAttributes(attr).wrap()) + + override def named(name: String): Balance[T] = withAttributes(OperationAttributes.name(name)) +} + +object Zip { /** - * Create a new `Balance` with the specified number of output ports and attributes. - * - * @param outputPorts number of output ports - * @param waitForAllDownstreams if you use `waitForAllDownstreams = true` it will not start emitting - * elements to downstream outputs until all of them have requested at least one element, - * default value is `false` - * @param attributes optional attributes + * Create a new `Zip`. */ - def apply[T](outputPorts: Int, waitForAllDownstreams: Boolean = false, attributes: OperationAttributes = OperationAttributes.none): Graph[UniformFanOutShape[T, T], Unit] = - new Graph[UniformFanOutShape[T, T], Unit] { - val shape = new UniformFanOutShape[T, T](outputPorts) - val module = new BalanceModule(shape, waitForAllDownstreams, OperationAttributes.name("Balance") and attributes) - } + def apply[A, B](): Zip[A, B] = { + val shape = new FanInShape2[A, B, (A, B)]("Zip") + new Zip(shape, new ZipWith2Module[A, B, (A, B)](shape, Keep.both, OperationAttributes.name("Zip"))) + } } /** @@ -113,22 +162,16 @@ object Balance { * * A `Zip` has a `left` and a `right` input port and one `out` port */ -object Zip { - /** - * Create a new `Zip` with the specified attributes. - * - * @param attributes optional attributes - */ - def apply[A, B](attributes: OperationAttributes = OperationAttributes.none): Graph[FanInShape2[A, B, (A, B)], Unit] = - new Graph[FanInShape2[A, B, (A, B)], Unit] { - val shape = new FanInShape2[A, B, (A, B)]("Zip") - val module = new ZipWith2Module[A, B, (A, B)](shape, Keep.both, OperationAttributes.name("Zip") and attributes) - } +class Zip[A, B] private (override val shape: FanInShape2[A, B, (A, B)], + private[stream] override val module: StreamLayout.Module) + extends Graph[FanInShape2[A, B, (A, B)], Unit] { + + override def withAttributes(attr: OperationAttributes): Zip[A, B] = + new Zip(shape, module.withAttributes(attr).wrap()) + + override def named(name: String): Zip[A, B] = withAttributes(OperationAttributes.name(name)) } -/** - * Combine the elements of multiple streams into a stream of the combined elements. - */ object ZipWith extends ZipWithApply /** @@ -138,15 +181,35 @@ object ZipWith extends ZipWithApply */ object Unzip { /** - * Create a new `Unzip` with the specified attributes. - * - * @param attributes optional attributes + * Create a new `Unzip`. */ - def apply[A, B](attributes: OperationAttributes = OperationAttributes.none): Graph[FanOutShape2[(A, B), A, B], Unit] = - new Graph[FanOutShape2[(A, B), A, B], Unit] { - val shape = new FanOutShape2[(A, B), A, B]("Unzip") - val module = new UnzipModule(shape, OperationAttributes.name("Unzip") and attributes) - } + def apply[A, B](): Unzip[A, B] = { + val shape = new FanOutShape2[(A, B), A, B]("Unzip") + new Unzip(shape, new UnzipModule(shape, OperationAttributes.name("Unzip"))) + } +} + +/** + * Combine the elements of multiple streams into a stream of the combined elements. + */ +class Unzip[A, B] private (override val shape: FanOutShape2[(A, B), A, B], + private[stream] override val module: StreamLayout.Module) + extends Graph[FanOutShape2[(A, B), A, B], Unit] { + + override def withAttributes(attr: OperationAttributes): Unzip[A, B] = + new Unzip(shape, module.withAttributes(attr).wrap()) + + override def named(name: String): Unzip[A, B] = withAttributes(OperationAttributes.name(name)) +} + +object Concat { + /** + * Create a new `Concat`. + */ + def apply[T](): Concat[T] = { + val shape = new UniformFanInShape[T, T](2) + new Concat(shape, new ConcatModule(shape, OperationAttributes.name("Concat"))) + } } /** @@ -156,17 +219,14 @@ object Unzip { * * A `Concat` has one `first` port, one `second` port and one `out` port. */ -object Concat { - /** - * Create a new `Concat` with the specified attributes. - * - * @param attributes optional attributes - */ - def apply[A](attributes: OperationAttributes = OperationAttributes.none): Graph[UniformFanInShape[A, A], Unit] = - new Graph[UniformFanInShape[A, A], Unit] { - val shape = new UniformFanInShape[A, A](2) - val module = new ConcatModule(shape, OperationAttributes.name("Concat") and attributes) - } +class Concat[T] private (override val shape: UniformFanInShape[T, T], + private[stream] override val module: StreamLayout.Module) + extends Graph[UniformFanInShape[T, T], Unit] { + + override def withAttributes(attr: OperationAttributes): Concat[T] = + new Concat(shape, module.withAttributes(attr).wrap()) + + override def named(name: String): Concat[T] = withAttributes(OperationAttributes.name(name)) } object FlowGraph extends GraphApply { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index e5bdd48b7d..37d36a2089 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -38,10 +38,10 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) def mapMaterialized[Mat2](f: Mat ⇒ Mat2): Sink[In, Mat2] = new Sink(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) - def withAttributes(attr: OperationAttributes): Sink[In, Mat] = + override def withAttributes(attr: OperationAttributes): Sink[In, Mat] = new Sink(module.withAttributes(attr).wrap()) - def named(name: String): Sink[In, Mat] = withAttributes(OperationAttributes.name(name)) + override def named(name: String): Sink[In, Mat] = withAttributes(OperationAttributes.name(name)) /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(this) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 6f2b9a0ba9..68d55b3e6d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -148,6 +148,8 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = new Source(module.withAttributes(attr).wrap()) + override def named(name: String): Repr[Out, Mat] = withAttributes(OperationAttributes.name(name)) + /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this)