diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java index 3b15ce7b36..d20f398a34 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java @@ -272,4 +272,10 @@ public class BidiFlowTest extends StreamTest { Arrays.sort(rr); assertArrayEquals(new Long[] { 3L, 12L }, rr); } + + public void mustSuitablyOverrideAttributeHandlingMethods() { + @SuppressWarnings("unused") + final BidiFlow b = + bidi.withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named(""); + } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 61c7eef48a..bc1d87864e 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -785,4 +785,9 @@ public class FlowTest extends StreamTest { assertEquals((Object) 0, result); } + public void mustSuitablyOverrideAttributeHandlingMethods() { + @SuppressWarnings("unused") + final Flow f = + Flow.of(Integer.class).withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named(""); + } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index b0bbb22a13..80b688a549 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -13,16 +13,13 @@ import java.util.concurrent.TimeUnit; import akka.actor.ActorRef; import akka.japi.function.Function; import akka.japi.function.Procedure; -import akka.stream.Graph; -import akka.stream.UniformFanInShape; -import akka.stream.UniformFanOutShape; +import akka.stream.*; import org.junit.ClassRule; import org.junit.Test; import org.reactivestreams.Publisher; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; -import akka.stream.StreamTest; import akka.japi.function.Function2; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; @@ -101,4 +98,9 @@ public class SinkTest extends StreamTest { probe2.expectMsgEquals("done2"); } + public void mustSuitablyOverrideAttributeHandlingMethods() { + @SuppressWarnings("unused") + final Sink> s = + Sink. head().withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named(""); + } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 82543d17d9..cca8422953 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -11,10 +11,7 @@ import akka.dispatch.OnSuccess; import akka.japi.JavaPartialFunction; import akka.japi.Pair; import akka.japi.function.*; -import akka.stream.Graph; -import akka.stream.OverflowStrategy; -import akka.stream.StreamTest; -import akka.stream.UniformFanInShape; +import akka.stream.*; import akka.stream.impl.ConstantFun; import akka.stream.stage.*; import akka.stream.testkit.AkkaSpec; @@ -776,4 +773,9 @@ public class SourceTest extends StreamTest { assertEquals((Object) 0, result); } + public void mustSuitablyOverrideAttributeHandlingMethods() { + @SuppressWarnings("unused") + final Source f = + Source.single(42).withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named(""); + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala index b05d906a0f..7e87519ca6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala @@ -111,6 +111,11 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals { Await.result(r, 1.second).toSet should ===(Set(3L, 12L)) } + "suitably override attribute handling methods" in { + import Attributes._ + val b: BidiFlow[Int, Long, ByteString, String, Unit] = bidi.withAttributes(name("")).addAttributes(asyncBoundary).named("") + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 8caa8e8201..a93f305c04 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -586,6 +586,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } } } + + "suitably override attribute handling methods" in { + import Attributes._ + val f: Flow[Int, Int, Unit] = Flow[Int].withAttributes(asyncBoundary).addAttributes(none).named("") + } } object TestException extends RuntimeException with NoStackTrace diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index 708c6ee431..aaccae0480 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -3,9 +3,10 @@ */ package akka.stream.scaladsl -import akka.stream.{ SinkShape, ActorMaterializer } +import akka.stream._ import akka.stream.testkit.TestPublisher.ManualProbe import akka.stream.testkit._ +import scala.concurrent.Future class SinkSpec extends AkkaSpec { @@ -119,6 +120,10 @@ class SinkSpec extends AkkaSpec { } } + "suitably override attribute handling methods" in { + import Attributes._ + val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(asyncBoundary).addAttributes(none).named("") + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index ce3576b635..6bcd64aa69 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -10,7 +10,7 @@ import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ import scala.util.Failure import scala.util.control.NoStackTrace -import akka.stream.{ SourceShape, ActorMaterializer } +import akka.stream._ import akka.stream.testkit._ class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures { @@ -268,4 +268,11 @@ class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures { } } + "A Source" must { + "suitably override attribute handling methods" in { + import Attributes._ + val s: Source[Int, Unit] = Source.single(42).withAttributes(asyncBoundary).addAttributes(none).named("") + } + } + } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala index cb8951606f..9dbbbdd849 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala @@ -28,6 +28,12 @@ class SubFlowImpl[In, Out, Mat, F[+_], C](val subFlow: Flow[In, Out, Unit], override def withAttributes(attr: Attributes): SubFlow[Out, Mat, F, C] = new SubFlowImpl[In, Out, Mat, F, C](subFlow.withAttributes(attr), mergeBackFunction, finishFunction) + override def addAttributes(attr: Attributes): SubFlow[Out, Mat, F, C] = + new SubFlowImpl[In, Out, Mat, F, C](subFlow.addAttributes(attr), mergeBackFunction, finishFunction) + + override def named(name: String): SubFlow[Out, Mat, F, C] = + new SubFlowImpl[In, Out, Mat, F, C](subFlow.named(name), mergeBackFunction, finishFunction) + override def mergeSubstreamsWithParallelism(breadth: Int): F[Out] = mergeBackFunction(subFlow, breadth) def to[M](sink: Graph[SinkShape[Out], M]): C = finishFunction(subFlow.to(sink)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala index dd1bda470c..490932f5f0 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala @@ -192,6 +192,28 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): BidiFlow[I1, O1, I2, O2, Mat2] = new BidiFlow(delegate.mapMaterializedValue(f.apply _)) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ override def withAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] = new BidiFlow(delegate.withAttributes(attr)) + + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] = + new BidiFlow(delegate.addAttributes(attr)) + + /** + * Add a ``name`` attribute to this Flow. + */ + override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] = + new BidiFlow(delegate.named(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 54f9477742..b1fbe4b14f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1431,9 +1431,28 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def initialDelay(delay: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.initialDelay(delay)) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] = new Flow(delegate.withAttributes(attr)) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.addAttributes(attr)) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): javadsl.Flow[In, Out, Mat] = new Flow(delegate.named(name)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index ef13ce7f40..3c0ed80ab0 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -259,9 +259,28 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] = new Sink(delegate.mapMaterializedValue(f.apply _)) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ override def withAttributes(attr: Attributes): javadsl.Sink[In, Mat] = new Sink(delegate.withAttributes(attr)) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): javadsl.Sink[In, Mat] = + new Sink(delegate.addAttributes(attr)) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): javadsl.Sink[In, Mat] = new Sink(delegate.named(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 7baaddb18d..33d214ed58 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1599,9 +1599,28 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def initialDelay(delay: FiniteDuration): javadsl.Source[Out, Mat] = new Source(delegate.initialDelay(delay)) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] = new Source(delegate.withAttributes(attr)) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): javadsl.Source[Out, Mat] = + new Source(delegate.addAttributes(attr)) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): javadsl.Source[Out, Mat] = new Source(delegate.named(name)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 654c29e27a..9a2e2f35f1 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1040,9 +1040,28 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def initialDelay(delay: FiniteDuration): SubFlow[In, Out, Mat] = new SubFlow(delegate.initialDelay(delay)) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ def withAttributes(attr: Attributes): SubFlow[In, Out, Mat] = new SubFlow(delegate.withAttributes(attr)) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + def addAttributes(attr: Attributes): SubFlow[In, Out, Mat] = + new SubFlow(delegate.addAttributes(attr)) + + /** + * Add a ``name`` attribute to this Flow. + */ def named(name: String): SubFlow[In, Out, Mat] = new SubFlow(delegate.named(name)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 4e23a20864..cc5121c282 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1039,9 +1039,28 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def initialDelay(delay: FiniteDuration): SubSource[Out, Mat] = new SubSource(delegate.initialDelay(delay)) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ def withAttributes(attr: Attributes): SubSource[Out, Mat] = new SubSource(delegate.withAttributes(attr)) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + def addAttributes(attr: Attributes): SubSource[Out, Mat] = + new SubSource(delegate.addAttributes(attr)) + + /** + * Add a ``name`` attribute to this Flow. + */ def named(name: String): SubSource[Out, Mat] = new SubSource(delegate.named(name)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index fd12c42a8f..9b652997e8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -128,9 +128,28 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): BidiFlow[I1, O1, I2, O2, Mat2] = new BidiFlow(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ override def withAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] = new BidiFlow(module.withAttributes(attr).nest()) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] = + withAttributes(module.attributes and attr) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] = withAttributes(Attributes.name(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index ed1c966b65..7be555689b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -202,14 +202,27 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) } /** - * Change the attributes of this [[Flow]] to the given ones. Note that this + * Change the attributes of this [[Flow]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this * operation has no effect on an empty Flow (because the attributes apply * only to the contained processing stages). */ override def withAttributes(attr: Attributes): Repr[Out] = - if (this.module eq EmptyModule) this + if (isIdentity) this else new Flow(module.withAttributes(attr).nest()) + /** + * Add the given attributes to this Flow. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(module.attributes and attr) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): Repr[Out] = withAttributes(Attributes.name(name)) /** @@ -1585,7 +1598,9 @@ trait FlowOps[+Out, +Mat] { def withAttributes(attr: Attributes): Repr[Out] - def named(name: String): Repr[Out] = withAttributes(Attributes.name(name)) + def addAttributes(attr: Attributes): Repr[Out] + + def named(name: String): Repr[Out] /** INTERNAL API */ private[scaladsl] def andThen[T](op: SymbolicStage[Out, T]): Repr[T] = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 0caa93b1b8..1a2d07037c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -904,6 +904,12 @@ object GraphDSL extends GraphApply { override def withAttributes(attr: Attributes): Repr[Out] = throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") + override def addAttributes(attr: Attributes): Repr[Out] = + throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") + + override def named(name: String): Repr[Out] = + throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") + override def importAndGetPort(b: Builder[_]): Outlet[Out @uncheckedVariance] = outlet override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index bc02859882..686a7d3451 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -40,9 +40,28 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): Sink[In, Mat2] = new Sink(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ override def withAttributes(attr: Attributes): Sink[In, Mat] = new Sink(module.withAttributes(attr).nest()) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): Sink[In, Mat] = + withAttributes(module.attributes and attr) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): Sink[In, Mat] = withAttributes(Attributes.name(name)) /** Converts this Scala DSL element to it's Java DSL counterpart. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index cb648c947d..82586f5b99 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -110,13 +110,26 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) def runForeach(f: Out ⇒ Unit)(implicit materializer: Materializer): Future[Unit] = runWith(Sink.foreach(f)) /** - * Nests the current Source and returns a Source with the given Attributes - * @param attr the attributes to add - * @return a new Source with the added attributes + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). */ override def withAttributes(attr: Attributes): Repr[Out] = - new Source(module.withAttributes(attr).nest()) // User API + new Source(module.withAttributes(attr).nest()) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(module.attributes and attr) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): Repr[Out] = withAttributes(Attributes.name(name)) /** Converts this Scala DSL element to it's Java DSL counterpart. */